ยปCore Development>Code coverage>Lib/test/test_socket.py

Python code coverage for Lib/test/test_socket.py

#countcontent
1n/aimport unittest
2n/afrom test import support
3n/a
4n/aimport errno
5n/aimport io
6n/aimport itertools
7n/aimport socket
8n/aimport select
9n/aimport tempfile
10n/aimport time
11n/aimport traceback
12n/aimport queue
13n/aimport sys
14n/aimport os
15n/aimport array
16n/aimport contextlib
17n/afrom weakref import proxy
18n/aimport signal
19n/aimport math
20n/aimport pickle
21n/aimport struct
22n/aimport random
23n/aimport string
24n/atry:
25n/a import multiprocessing
26n/aexcept ImportError:
27n/a multiprocessing = False
28n/atry:
29n/a import fcntl
30n/aexcept ImportError:
31n/a fcntl = None
32n/a
33n/aHOST = support.HOST
34n/aMSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') ## test unicode string and carriage return
35n/a
36n/atry:
37n/a import _thread as thread
38n/a import threading
39n/aexcept ImportError:
40n/a thread = None
41n/a threading = None
42n/atry:
43n/a import _socket
44n/aexcept ImportError:
45n/a _socket = None
46n/a
47n/a
48n/adef _have_socket_can():
49n/a """Check whether CAN sockets are supported on this host."""
50n/a try:
51n/a s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
52n/a except (AttributeError, OSError):
53n/a return False
54n/a else:
55n/a s.close()
56n/a return True
57n/a
58n/adef _have_socket_rds():
59n/a """Check whether RDS sockets are supported on this host."""
60n/a try:
61n/a s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
62n/a except (AttributeError, OSError):
63n/a return False
64n/a else:
65n/a s.close()
66n/a return True
67n/a
68n/adef _have_socket_alg():
69n/a """Check whether AF_ALG sockets are supported on this host."""
70n/a try:
71n/a s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
72n/a except (AttributeError, OSError):
73n/a return False
74n/a else:
75n/a s.close()
76n/a return True
77n/a
78n/aHAVE_SOCKET_CAN = _have_socket_can()
79n/a
80n/aHAVE_SOCKET_RDS = _have_socket_rds()
81n/a
82n/aHAVE_SOCKET_ALG = _have_socket_alg()
83n/a
84n/a# Size in bytes of the int type
85n/aSIZEOF_INT = array.array("i").itemsize
86n/a
87n/aclass SocketTCPTest(unittest.TestCase):
88n/a
89n/a def setUp(self):
90n/a self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
91n/a self.port = support.bind_port(self.serv)
92n/a self.serv.listen()
93n/a
94n/a def tearDown(self):
95n/a self.serv.close()
96n/a self.serv = None
97n/a
98n/aclass SocketUDPTest(unittest.TestCase):
99n/a
100n/a def setUp(self):
101n/a self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
102n/a self.port = support.bind_port(self.serv)
103n/a
104n/a def tearDown(self):
105n/a self.serv.close()
106n/a self.serv = None
107n/a
108n/aclass ThreadSafeCleanupTestCase(unittest.TestCase):
109n/a """Subclass of unittest.TestCase with thread-safe cleanup methods.
110n/a
111n/a This subclass protects the addCleanup() and doCleanups() methods
112n/a with a recursive lock.
113n/a """
114n/a
115n/a if threading:
116n/a def __init__(self, *args, **kwargs):
117n/a super().__init__(*args, **kwargs)
118n/a self._cleanup_lock = threading.RLock()
119n/a
120n/a def addCleanup(self, *args, **kwargs):
121n/a with self._cleanup_lock:
122n/a return super().addCleanup(*args, **kwargs)
123n/a
124n/a def doCleanups(self, *args, **kwargs):
125n/a with self._cleanup_lock:
126n/a return super().doCleanups(*args, **kwargs)
127n/a
128n/aclass SocketCANTest(unittest.TestCase):
129n/a
130n/a """To be able to run this test, a `vcan0` CAN interface can be created with
131n/a the following commands:
132n/a # modprobe vcan
133n/a # ip link add dev vcan0 type vcan
134n/a # ifconfig vcan0 up
135n/a """
136n/a interface = 'vcan0'
137n/a bufsize = 128
138n/a
139n/a """The CAN frame structure is defined in <linux/can.h>:
140n/a
141n/a struct can_frame {
142n/a canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
143n/a __u8 can_dlc; /* data length code: 0 .. 8 */
144n/a __u8 data[8] __attribute__((aligned(8)));
145n/a };
146n/a """
147n/a can_frame_fmt = "=IB3x8s"
148n/a can_frame_size = struct.calcsize(can_frame_fmt)
149n/a
150n/a """The Broadcast Management Command frame structure is defined
151n/a in <linux/can/bcm.h>:
152n/a
153n/a struct bcm_msg_head {
154n/a __u32 opcode;
155n/a __u32 flags;
156n/a __u32 count;
157n/a struct timeval ival1, ival2;
158n/a canid_t can_id;
159n/a __u32 nframes;
160n/a struct can_frame frames[0];
161n/a }
162n/a
163n/a `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
164n/a `struct can_frame` definition). Must use native not standard types for packing.
165n/a """
166n/a bcm_cmd_msg_fmt = "@3I4l2I"
167n/a bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
168n/a
169n/a def setUp(self):
170n/a self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
171n/a self.addCleanup(self.s.close)
172n/a try:
173n/a self.s.bind((self.interface,))
174n/a except OSError:
175n/a self.skipTest('network interface `%s` does not exist' %
176n/a self.interface)
177n/a
178n/a
179n/aclass SocketRDSTest(unittest.TestCase):
180n/a
181n/a """To be able to run this test, the `rds` kernel module must be loaded:
182n/a # modprobe rds
183n/a """
184n/a bufsize = 8192
185n/a
186n/a def setUp(self):
187n/a self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
188n/a self.addCleanup(self.serv.close)
189n/a try:
190n/a self.port = support.bind_port(self.serv)
191n/a except OSError:
192n/a self.skipTest('unable to bind RDS socket')
193n/a
194n/a
195n/aclass ThreadableTest:
196n/a """Threadable Test class
197n/a
198n/a The ThreadableTest class makes it easy to create a threaded
199n/a client/server pair from an existing unit test. To create a
200n/a new threaded class from an existing unit test, use multiple
201n/a inheritance:
202n/a
203n/a class NewClass (OldClass, ThreadableTest):
204n/a pass
205n/a
206n/a This class defines two new fixture functions with obvious
207n/a purposes for overriding:
208n/a
209n/a clientSetUp ()
210n/a clientTearDown ()
211n/a
212n/a Any new test functions within the class must then define
213n/a tests in pairs, where the test name is preceded with a
214n/a '_' to indicate the client portion of the test. Ex:
215n/a
216n/a def testFoo(self):
217n/a # Server portion
218n/a
219n/a def _testFoo(self):
220n/a # Client portion
221n/a
222n/a Any exceptions raised by the clients during their tests
223n/a are caught and transferred to the main thread to alert
224n/a the testing framework.
225n/a
226n/a Note, the server setup function cannot call any blocking
227n/a functions that rely on the client thread during setup,
228n/a unless serverExplicitReady() is called just before
229n/a the blocking call (such as in setting up a client/server
230n/a connection and performing the accept() in setUp().
231n/a """
232n/a
233n/a def __init__(self):
234n/a # Swap the true setup function
235n/a self.__setUp = self.setUp
236n/a self.__tearDown = self.tearDown
237n/a self.setUp = self._setUp
238n/a self.tearDown = self._tearDown
239n/a
240n/a def serverExplicitReady(self):
241n/a """This method allows the server to explicitly indicate that
242n/a it wants the client thread to proceed. This is useful if the
243n/a server is about to execute a blocking routine that is
244n/a dependent upon the client thread during its setup routine."""
245n/a self.server_ready.set()
246n/a
247n/a def _setUp(self):
248n/a self.server_ready = threading.Event()
249n/a self.client_ready = threading.Event()
250n/a self.done = threading.Event()
251n/a self.queue = queue.Queue(1)
252n/a self.server_crashed = False
253n/a
254n/a # Do some munging to start the client test.
255n/a methodname = self.id()
256n/a i = methodname.rfind('.')
257n/a methodname = methodname[i+1:]
258n/a test_method = getattr(self, '_' + methodname)
259n/a self.client_thread = thread.start_new_thread(
260n/a self.clientRun, (test_method,))
261n/a
262n/a try:
263n/a self.__setUp()
264n/a except:
265n/a self.server_crashed = True
266n/a raise
267n/a finally:
268n/a self.server_ready.set()
269n/a self.client_ready.wait()
270n/a
271n/a def _tearDown(self):
272n/a self.__tearDown()
273n/a self.done.wait()
274n/a
275n/a if self.queue.qsize():
276n/a exc = self.queue.get()
277n/a raise exc
278n/a
279n/a def clientRun(self, test_func):
280n/a self.server_ready.wait()
281n/a try:
282n/a self.clientSetUp()
283n/a except BaseException as e:
284n/a self.queue.put(e)
285n/a self.clientTearDown()
286n/a return
287n/a finally:
288n/a self.client_ready.set()
289n/a if self.server_crashed:
290n/a self.clientTearDown()
291n/a return
292n/a if not hasattr(test_func, '__call__'):
293n/a raise TypeError("test_func must be a callable function")
294n/a try:
295n/a test_func()
296n/a except BaseException as e:
297n/a self.queue.put(e)
298n/a finally:
299n/a self.clientTearDown()
300n/a
301n/a def clientSetUp(self):
302n/a raise NotImplementedError("clientSetUp must be implemented.")
303n/a
304n/a def clientTearDown(self):
305n/a self.done.set()
306n/a thread.exit()
307n/a
308n/aclass ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
309n/a
310n/a def __init__(self, methodName='runTest'):
311n/a SocketTCPTest.__init__(self, methodName=methodName)
312n/a ThreadableTest.__init__(self)
313n/a
314n/a def clientSetUp(self):
315n/a self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
316n/a
317n/a def clientTearDown(self):
318n/a self.cli.close()
319n/a self.cli = None
320n/a ThreadableTest.clientTearDown(self)
321n/a
322n/aclass ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
323n/a
324n/a def __init__(self, methodName='runTest'):
325n/a SocketUDPTest.__init__(self, methodName=methodName)
326n/a ThreadableTest.__init__(self)
327n/a
328n/a def clientSetUp(self):
329n/a self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
330n/a
331n/a def clientTearDown(self):
332n/a self.cli.close()
333n/a self.cli = None
334n/a ThreadableTest.clientTearDown(self)
335n/a
336n/aclass ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
337n/a
338n/a def __init__(self, methodName='runTest'):
339n/a SocketCANTest.__init__(self, methodName=methodName)
340n/a ThreadableTest.__init__(self)
341n/a
342n/a def clientSetUp(self):
343n/a self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
344n/a try:
345n/a self.cli.bind((self.interface,))
346n/a except OSError:
347n/a # skipTest should not be called here, and will be called in the
348n/a # server instead
349n/a pass
350n/a
351n/a def clientTearDown(self):
352n/a self.cli.close()
353n/a self.cli = None
354n/a ThreadableTest.clientTearDown(self)
355n/a
356n/aclass ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
357n/a
358n/a def __init__(self, methodName='runTest'):
359n/a SocketRDSTest.__init__(self, methodName=methodName)
360n/a ThreadableTest.__init__(self)
361n/a
362n/a def clientSetUp(self):
363n/a self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
364n/a try:
365n/a # RDS sockets must be bound explicitly to send or receive data
366n/a self.cli.bind((HOST, 0))
367n/a self.cli_addr = self.cli.getsockname()
368n/a except OSError:
369n/a # skipTest should not be called here, and will be called in the
370n/a # server instead
371n/a pass
372n/a
373n/a def clientTearDown(self):
374n/a self.cli.close()
375n/a self.cli = None
376n/a ThreadableTest.clientTearDown(self)
377n/a
378n/aclass SocketConnectedTest(ThreadedTCPSocketTest):
379n/a """Socket tests for client-server connection.
380n/a
381n/a self.cli_conn is a client socket connected to the server. The
382n/a setUp() method guarantees that it is connected to the server.
383n/a """
384n/a
385n/a def __init__(self, methodName='runTest'):
386n/a ThreadedTCPSocketTest.__init__(self, methodName=methodName)
387n/a
388n/a def setUp(self):
389n/a ThreadedTCPSocketTest.setUp(self)
390n/a # Indicate explicitly we're ready for the client thread to
391n/a # proceed and then perform the blocking call to accept
392n/a self.serverExplicitReady()
393n/a conn, addr = self.serv.accept()
394n/a self.cli_conn = conn
395n/a
396n/a def tearDown(self):
397n/a self.cli_conn.close()
398n/a self.cli_conn = None
399n/a ThreadedTCPSocketTest.tearDown(self)
400n/a
401n/a def clientSetUp(self):
402n/a ThreadedTCPSocketTest.clientSetUp(self)
403n/a self.cli.connect((HOST, self.port))
404n/a self.serv_conn = self.cli
405n/a
406n/a def clientTearDown(self):
407n/a self.serv_conn.close()
408n/a self.serv_conn = None
409n/a ThreadedTCPSocketTest.clientTearDown(self)
410n/a
411n/aclass SocketPairTest(unittest.TestCase, ThreadableTest):
412n/a
413n/a def __init__(self, methodName='runTest'):
414n/a unittest.TestCase.__init__(self, methodName=methodName)
415n/a ThreadableTest.__init__(self)
416n/a
417n/a def setUp(self):
418n/a self.serv, self.cli = socket.socketpair()
419n/a
420n/a def tearDown(self):
421n/a self.serv.close()
422n/a self.serv = None
423n/a
424n/a def clientSetUp(self):
425n/a pass
426n/a
427n/a def clientTearDown(self):
428n/a self.cli.close()
429n/a self.cli = None
430n/a ThreadableTest.clientTearDown(self)
431n/a
432n/a
433n/a# The following classes are used by the sendmsg()/recvmsg() tests.
434n/a# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
435n/a# gives a drop-in replacement for SocketConnectedTest, but different
436n/a# address families can be used, and the attributes serv_addr and
437n/a# cli_addr will be set to the addresses of the endpoints.
438n/a
439n/aclass SocketTestBase(unittest.TestCase):
440n/a """A base class for socket tests.
441n/a
442n/a Subclasses must provide methods newSocket() to return a new socket
443n/a and bindSock(sock) to bind it to an unused address.
444n/a
445n/a Creates a socket self.serv and sets self.serv_addr to its address.
446n/a """
447n/a
448n/a def setUp(self):
449n/a self.serv = self.newSocket()
450n/a self.bindServer()
451n/a
452n/a def bindServer(self):
453n/a """Bind server socket and set self.serv_addr to its address."""
454n/a self.bindSock(self.serv)
455n/a self.serv_addr = self.serv.getsockname()
456n/a
457n/a def tearDown(self):
458n/a self.serv.close()
459n/a self.serv = None
460n/a
461n/a
462n/aclass SocketListeningTestMixin(SocketTestBase):
463n/a """Mixin to listen on the server socket."""
464n/a
465n/a def setUp(self):
466n/a super().setUp()
467n/a self.serv.listen()
468n/a
469n/a
470n/aclass ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
471n/a ThreadableTest):
472n/a """Mixin to add client socket and allow client/server tests.
473n/a
474n/a Client socket is self.cli and its address is self.cli_addr. See
475n/a ThreadableTest for usage information.
476n/a """
477n/a
478n/a def __init__(self, *args, **kwargs):
479n/a super().__init__(*args, **kwargs)
480n/a ThreadableTest.__init__(self)
481n/a
482n/a def clientSetUp(self):
483n/a self.cli = self.newClientSocket()
484n/a self.bindClient()
485n/a
486n/a def newClientSocket(self):
487n/a """Return a new socket for use as client."""
488n/a return self.newSocket()
489n/a
490n/a def bindClient(self):
491n/a """Bind client socket and set self.cli_addr to its address."""
492n/a self.bindSock(self.cli)
493n/a self.cli_addr = self.cli.getsockname()
494n/a
495n/a def clientTearDown(self):
496n/a self.cli.close()
497n/a self.cli = None
498n/a ThreadableTest.clientTearDown(self)
499n/a
500n/a
501n/aclass ConnectedStreamTestMixin(SocketListeningTestMixin,
502n/a ThreadedSocketTestMixin):
503n/a """Mixin to allow client/server stream tests with connected client.
504n/a
505n/a Server's socket representing connection to client is self.cli_conn
506n/a and client's connection to server is self.serv_conn. (Based on
507n/a SocketConnectedTest.)
508n/a """
509n/a
510n/a def setUp(self):
511n/a super().setUp()
512n/a # Indicate explicitly we're ready for the client thread to
513n/a # proceed and then perform the blocking call to accept
514n/a self.serverExplicitReady()
515n/a conn, addr = self.serv.accept()
516n/a self.cli_conn = conn
517n/a
518n/a def tearDown(self):
519n/a self.cli_conn.close()
520n/a self.cli_conn = None
521n/a super().tearDown()
522n/a
523n/a def clientSetUp(self):
524n/a super().clientSetUp()
525n/a self.cli.connect(self.serv_addr)
526n/a self.serv_conn = self.cli
527n/a
528n/a def clientTearDown(self):
529n/a try:
530n/a self.serv_conn.close()
531n/a self.serv_conn = None
532n/a except AttributeError:
533n/a pass
534n/a super().clientTearDown()
535n/a
536n/a
537n/aclass UnixSocketTestBase(SocketTestBase):
538n/a """Base class for Unix-domain socket tests."""
539n/a
540n/a # This class is used for file descriptor passing tests, so we
541n/a # create the sockets in a private directory so that other users
542n/a # can't send anything that might be problematic for a privileged
543n/a # user running the tests.
544n/a
545n/a def setUp(self):
546n/a self.dir_path = tempfile.mkdtemp()
547n/a self.addCleanup(os.rmdir, self.dir_path)
548n/a super().setUp()
549n/a
550n/a def bindSock(self, sock):
551n/a path = tempfile.mktemp(dir=self.dir_path)
552n/a support.bind_unix_socket(sock, path)
553n/a self.addCleanup(support.unlink, path)
554n/a
555n/aclass UnixStreamBase(UnixSocketTestBase):
556n/a """Base class for Unix-domain SOCK_STREAM tests."""
557n/a
558n/a def newSocket(self):
559n/a return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
560n/a
561n/a
562n/aclass InetTestBase(SocketTestBase):
563n/a """Base class for IPv4 socket tests."""
564n/a
565n/a host = HOST
566n/a
567n/a def setUp(self):
568n/a super().setUp()
569n/a self.port = self.serv_addr[1]
570n/a
571n/a def bindSock(self, sock):
572n/a support.bind_port(sock, host=self.host)
573n/a
574n/aclass TCPTestBase(InetTestBase):
575n/a """Base class for TCP-over-IPv4 tests."""
576n/a
577n/a def newSocket(self):
578n/a return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
579n/a
580n/aclass UDPTestBase(InetTestBase):
581n/a """Base class for UDP-over-IPv4 tests."""
582n/a
583n/a def newSocket(self):
584n/a return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
585n/a
586n/aclass SCTPStreamBase(InetTestBase):
587n/a """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
588n/a
589n/a def newSocket(self):
590n/a return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
591n/a socket.IPPROTO_SCTP)
592n/a
593n/a
594n/aclass Inet6TestBase(InetTestBase):
595n/a """Base class for IPv6 socket tests."""
596n/a
597n/a host = support.HOSTv6
598n/a
599n/aclass UDP6TestBase(Inet6TestBase):
600n/a """Base class for UDP-over-IPv6 tests."""
601n/a
602n/a def newSocket(self):
603n/a return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
604n/a
605n/a
606n/a# Test-skipping decorators for use with ThreadableTest.
607n/a
608n/adef skipWithClientIf(condition, reason):
609n/a """Skip decorated test if condition is true, add client_skip decorator.
610n/a
611n/a If the decorated object is not a class, sets its attribute
612n/a "client_skip" to a decorator which will return an empty function
613n/a if the test is to be skipped, or the original function if it is
614n/a not. This can be used to avoid running the client part of a
615n/a skipped test when using ThreadableTest.
616n/a """
617n/a def client_pass(*args, **kwargs):
618n/a pass
619n/a def skipdec(obj):
620n/a retval = unittest.skip(reason)(obj)
621n/a if not isinstance(obj, type):
622n/a retval.client_skip = lambda f: client_pass
623n/a return retval
624n/a def noskipdec(obj):
625n/a if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
626n/a obj.client_skip = lambda f: f
627n/a return obj
628n/a return skipdec if condition else noskipdec
629n/a
630n/a
631n/adef requireAttrs(obj, *attributes):
632n/a """Skip decorated test if obj is missing any of the given attributes.
633n/a
634n/a Sets client_skip attribute as skipWithClientIf() does.
635n/a """
636n/a missing = [name for name in attributes if not hasattr(obj, name)]
637n/a return skipWithClientIf(
638n/a missing, "don't have " + ", ".join(name for name in missing))
639n/a
640n/a
641n/adef requireSocket(*args):
642n/a """Skip decorated test if a socket cannot be created with given arguments.
643n/a
644n/a When an argument is given as a string, will use the value of that
645n/a attribute of the socket module, or skip the test if it doesn't
646n/a exist. Sets client_skip attribute as skipWithClientIf() does.
647n/a """
648n/a err = None
649n/a missing = [obj for obj in args if
650n/a isinstance(obj, str) and not hasattr(socket, obj)]
651n/a if missing:
652n/a err = "don't have " + ", ".join(name for name in missing)
653n/a else:
654n/a callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
655n/a for obj in args]
656n/a try:
657n/a s = socket.socket(*callargs)
658n/a except OSError as e:
659n/a # XXX: check errno?
660n/a err = str(e)
661n/a else:
662n/a s.close()
663n/a return skipWithClientIf(
664n/a err is not None,
665n/a "can't create socket({0}): {1}".format(
666n/a ", ".join(str(o) for o in args), err))
667n/a
668n/a
669n/a#######################################################################
670n/a## Begin Tests
671n/a
672n/aclass GeneralModuleTests(unittest.TestCase):
673n/a
674n/a def test_SocketType_is_socketobject(self):
675n/a import _socket
676n/a self.assertTrue(socket.SocketType is _socket.socket)
677n/a s = socket.socket()
678n/a self.assertIsInstance(s, socket.SocketType)
679n/a s.close()
680n/a
681n/a def test_repr(self):
682n/a s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
683n/a with s:
684n/a self.assertIn('fd=%i' % s.fileno(), repr(s))
685n/a self.assertIn('family=%s' % socket.AF_INET, repr(s))
686n/a self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
687n/a self.assertIn('proto=0', repr(s))
688n/a self.assertNotIn('raddr', repr(s))
689n/a s.bind(('127.0.0.1', 0))
690n/a self.assertIn('laddr', repr(s))
691n/a self.assertIn(str(s.getsockname()), repr(s))
692n/a self.assertIn('[closed]', repr(s))
693n/a self.assertNotIn('laddr', repr(s))
694n/a
695n/a @unittest.skipUnless(_socket is not None, 'need _socket module')
696n/a def test_csocket_repr(self):
697n/a s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
698n/a try:
699n/a expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
700n/a % (s.fileno(), s.family, s.type, s.proto))
701n/a self.assertEqual(repr(s), expected)
702n/a finally:
703n/a s.close()
704n/a expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
705n/a % (s.family, s.type, s.proto))
706n/a self.assertEqual(repr(s), expected)
707n/a
708n/a def test_weakref(self):
709n/a s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
710n/a p = proxy(s)
711n/a self.assertEqual(p.fileno(), s.fileno())
712n/a s.close()
713n/a s = None
714n/a try:
715n/a p.fileno()
716n/a except ReferenceError:
717n/a pass
718n/a else:
719n/a self.fail('Socket proxy still exists')
720n/a
721n/a def testSocketError(self):
722n/a # Testing socket module exceptions
723n/a msg = "Error raising socket exception (%s)."
724n/a with self.assertRaises(OSError, msg=msg % 'OSError'):
725n/a raise OSError
726n/a with self.assertRaises(OSError, msg=msg % 'socket.herror'):
727n/a raise socket.herror
728n/a with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
729n/a raise socket.gaierror
730n/a
731n/a def testSendtoErrors(self):
732n/a # Testing that sendto doesn't mask failures. See #10169.
733n/a s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
734n/a self.addCleanup(s.close)
735n/a s.bind(('', 0))
736n/a sockname = s.getsockname()
737n/a # 2 args
738n/a with self.assertRaises(TypeError) as cm:
739n/a s.sendto('\u2620', sockname)
740n/a self.assertEqual(str(cm.exception),
741n/a "a bytes-like object is required, not 'str'")
742n/a with self.assertRaises(TypeError) as cm:
743n/a s.sendto(5j, sockname)
744n/a self.assertEqual(str(cm.exception),
745n/a "a bytes-like object is required, not 'complex'")
746n/a with self.assertRaises(TypeError) as cm:
747n/a s.sendto(b'foo', None)
748n/a self.assertIn('not NoneType',str(cm.exception))
749n/a # 3 args
750n/a with self.assertRaises(TypeError) as cm:
751n/a s.sendto('\u2620', 0, sockname)
752n/a self.assertEqual(str(cm.exception),
753n/a "a bytes-like object is required, not 'str'")
754n/a with self.assertRaises(TypeError) as cm:
755n/a s.sendto(5j, 0, sockname)
756n/a self.assertEqual(str(cm.exception),
757n/a "a bytes-like object is required, not 'complex'")
758n/a with self.assertRaises(TypeError) as cm:
759n/a s.sendto(b'foo', 0, None)
760n/a self.assertIn('not NoneType', str(cm.exception))
761n/a with self.assertRaises(TypeError) as cm:
762n/a s.sendto(b'foo', 'bar', sockname)
763n/a self.assertIn('an integer is required', str(cm.exception))
764n/a with self.assertRaises(TypeError) as cm:
765n/a s.sendto(b'foo', None, None)
766n/a self.assertIn('an integer is required', str(cm.exception))
767n/a # wrong number of args
768n/a with self.assertRaises(TypeError) as cm:
769n/a s.sendto(b'foo')
770n/a self.assertIn('(1 given)', str(cm.exception))
771n/a with self.assertRaises(TypeError) as cm:
772n/a s.sendto(b'foo', 0, sockname, 4)
773n/a self.assertIn('(4 given)', str(cm.exception))
774n/a
775n/a def testCrucialConstants(self):
776n/a # Testing for mission critical constants
777n/a socket.AF_INET
778n/a socket.SOCK_STREAM
779n/a socket.SOCK_DGRAM
780n/a socket.SOCK_RAW
781n/a socket.SOCK_RDM
782n/a socket.SOCK_SEQPACKET
783n/a socket.SOL_SOCKET
784n/a socket.SO_REUSEADDR
785n/a
786n/a def testHostnameRes(self):
787n/a # Testing hostname resolution mechanisms
788n/a hostname = socket.gethostname()
789n/a try:
790n/a ip = socket.gethostbyname(hostname)
791n/a except OSError:
792n/a # Probably name lookup wasn't set up right; skip this test
793n/a self.skipTest('name lookup failure')
794n/a self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
795n/a try:
796n/a hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
797n/a except OSError:
798n/a # Probably a similar problem as above; skip this test
799n/a self.skipTest('name lookup failure')
800n/a all_host_names = [hostname, hname] + aliases
801n/a fqhn = socket.getfqdn(ip)
802n/a if not fqhn in all_host_names:
803n/a self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
804n/a
805n/a def test_host_resolution(self):
806n/a for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
807n/a '1:1:1:1:1:1:1:1:1']:
808n/a self.assertRaises(OSError, socket.gethostbyname, addr)
809n/a self.assertRaises(OSError, socket.gethostbyaddr, addr)
810n/a
811n/a for addr in [support.HOST, '10.0.0.1', '255.255.255.255']:
812n/a self.assertEqual(socket.gethostbyname(addr), addr)
813n/a
814n/a # we don't test support.HOSTv6 because there's a chance it doesn't have
815n/a # a matching name entry (e.g. 'ip6-localhost')
816n/a for host in [support.HOST]:
817n/a self.assertIn(host, socket.gethostbyaddr(host)[2])
818n/a
819n/a @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
820n/a @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
821n/a def test_sethostname(self):
822n/a oldhn = socket.gethostname()
823n/a try:
824n/a socket.sethostname('new')
825n/a except OSError as e:
826n/a if e.errno == errno.EPERM:
827n/a self.skipTest("test should be run as root")
828n/a else:
829n/a raise
830n/a try:
831n/a # running test as root!
832n/a self.assertEqual(socket.gethostname(), 'new')
833n/a # Should work with bytes objects too
834n/a socket.sethostname(b'bar')
835n/a self.assertEqual(socket.gethostname(), 'bar')
836n/a finally:
837n/a socket.sethostname(oldhn)
838n/a
839n/a @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
840n/a 'socket.if_nameindex() not available.')
841n/a def testInterfaceNameIndex(self):
842n/a interfaces = socket.if_nameindex()
843n/a for index, name in interfaces:
844n/a self.assertIsInstance(index, int)
845n/a self.assertIsInstance(name, str)
846n/a # interface indices are non-zero integers
847n/a self.assertGreater(index, 0)
848n/a _index = socket.if_nametoindex(name)
849n/a self.assertIsInstance(_index, int)
850n/a self.assertEqual(index, _index)
851n/a _name = socket.if_indextoname(index)
852n/a self.assertIsInstance(_name, str)
853n/a self.assertEqual(name, _name)
854n/a
855n/a @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
856n/a 'socket.if_nameindex() not available.')
857n/a def testInvalidInterfaceNameIndex(self):
858n/a # test nonexistent interface index/name
859n/a self.assertRaises(OSError, socket.if_indextoname, 0)
860n/a self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
861n/a # test with invalid values
862n/a self.assertRaises(TypeError, socket.if_nametoindex, 0)
863n/a self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
864n/a
865n/a @unittest.skipUnless(hasattr(sys, 'getrefcount'),
866n/a 'test needs sys.getrefcount()')
867n/a def testRefCountGetNameInfo(self):
868n/a # Testing reference count for getnameinfo
869n/a try:
870n/a # On some versions, this loses a reference
871n/a orig = sys.getrefcount(__name__)
872n/a socket.getnameinfo(__name__,0)
873n/a except TypeError:
874n/a if sys.getrefcount(__name__) != orig:
875n/a self.fail("socket.getnameinfo loses a reference")
876n/a
877n/a def testInterpreterCrash(self):
878n/a # Making sure getnameinfo doesn't crash the interpreter
879n/a try:
880n/a # On some versions, this crashes the interpreter.
881n/a socket.getnameinfo(('x', 0, 0, 0), 0)
882n/a except OSError:
883n/a pass
884n/a
885n/a def testNtoH(self):
886n/a # This just checks that htons etc. are their own inverse,
887n/a # when looking at the lower 16 or 32 bits.
888n/a sizes = {socket.htonl: 32, socket.ntohl: 32,
889n/a socket.htons: 16, socket.ntohs: 16}
890n/a for func, size in sizes.items():
891n/a mask = (1<<size) - 1
892n/a for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
893n/a self.assertEqual(i & mask, func(func(i&mask)) & mask)
894n/a
895n/a swapped = func(mask)
896n/a self.assertEqual(swapped & mask, mask)
897n/a self.assertRaises(OverflowError, func, 1<<34)
898n/a
899n/a def testNtoHErrors(self):
900n/a import _testcapi
901n/a s_good_values = [0, 1, 2, 0xffff]
902n/a l_good_values = s_good_values + [0xffffffff]
903n/a l_bad_values = [-1, -2, 1<<32, 1<<1000]
904n/a s_bad_values = l_bad_values + [_testcapi.INT_MIN - 1,
905n/a _testcapi.INT_MAX + 1]
906n/a s_deprecated_values = [1<<16, _testcapi.INT_MAX]
907n/a for k in s_good_values:
908n/a socket.ntohs(k)
909n/a socket.htons(k)
910n/a for k in l_good_values:
911n/a socket.ntohl(k)
912n/a socket.htonl(k)
913n/a for k in s_bad_values:
914n/a self.assertRaises(OverflowError, socket.ntohs, k)
915n/a self.assertRaises(OverflowError, socket.htons, k)
916n/a for k in l_bad_values:
917n/a self.assertRaises(OverflowError, socket.ntohl, k)
918n/a self.assertRaises(OverflowError, socket.htonl, k)
919n/a for k in s_deprecated_values:
920n/a self.assertWarns(DeprecationWarning, socket.ntohs, k)
921n/a self.assertWarns(DeprecationWarning, socket.htons, k)
922n/a
923n/a def testGetServBy(self):
924n/a eq = self.assertEqual
925n/a # Find one service that exists, then check all the related interfaces.
926n/a # I've ordered this by protocols that have both a tcp and udp
927n/a # protocol, at least for modern Linuxes.
928n/a if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
929n/a or sys.platform in ('linux', 'darwin')):
930n/a # avoid the 'echo' service on this platform, as there is an
931n/a # assumption breaking non-standard port/protocol entry
932n/a services = ('daytime', 'qotd', 'domain')
933n/a else:
934n/a services = ('echo', 'daytime', 'domain')
935n/a for service in services:
936n/a try:
937n/a port = socket.getservbyname(service, 'tcp')
938n/a break
939n/a except OSError:
940n/a pass
941n/a else:
942n/a raise OSError
943n/a # Try same call with optional protocol omitted
944n/a # Issue #26936: Android getservbyname() was broken before API 23.
945n/a if (not hasattr(sys, 'getandroidapilevel') or
946n/a sys.getandroidapilevel() >= 23):
947n/a port2 = socket.getservbyname(service)
948n/a eq(port, port2)
949n/a # Try udp, but don't barf if it doesn't exist
950n/a try:
951n/a udpport = socket.getservbyname(service, 'udp')
952n/a except OSError:
953n/a udpport = None
954n/a else:
955n/a eq(udpport, port)
956n/a # Now make sure the lookup by port returns the same service name
957n/a # Issue #26936: Android getservbyport() is broken.
958n/a if not support.is_android:
959n/a eq(socket.getservbyport(port2), service)
960n/a eq(socket.getservbyport(port, 'tcp'), service)
961n/a if udpport is not None:
962n/a eq(socket.getservbyport(udpport, 'udp'), service)
963n/a # Make sure getservbyport does not accept out of range ports.
964n/a self.assertRaises(OverflowError, socket.getservbyport, -1)
965n/a self.assertRaises(OverflowError, socket.getservbyport, 65536)
966n/a
967n/a def testDefaultTimeout(self):
968n/a # Testing default timeout
969n/a # The default timeout should initially be None
970n/a self.assertEqual(socket.getdefaulttimeout(), None)
971n/a s = socket.socket()
972n/a self.assertEqual(s.gettimeout(), None)
973n/a s.close()
974n/a
975n/a # Set the default timeout to 10, and see if it propagates
976n/a socket.setdefaulttimeout(10)
977n/a self.assertEqual(socket.getdefaulttimeout(), 10)
978n/a s = socket.socket()
979n/a self.assertEqual(s.gettimeout(), 10)
980n/a s.close()
981n/a
982n/a # Reset the default timeout to None, and see if it propagates
983n/a socket.setdefaulttimeout(None)
984n/a self.assertEqual(socket.getdefaulttimeout(), None)
985n/a s = socket.socket()
986n/a self.assertEqual(s.gettimeout(), None)
987n/a s.close()
988n/a
989n/a # Check that setting it to an invalid value raises ValueError
990n/a self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
991n/a
992n/a # Check that setting it to an invalid type raises TypeError
993n/a self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
994n/a
995n/a @unittest.skipUnless(hasattr(socket, 'inet_aton'),
996n/a 'test needs socket.inet_aton()')
997n/a def testIPv4_inet_aton_fourbytes(self):
998n/a # Test that issue1008086 and issue767150 are fixed.
999n/a # It must return 4 bytes.
1000n/a self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
1001n/a self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
1002n/a
1003n/a @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1004n/a 'test needs socket.inet_pton()')
1005n/a def testIPv4toString(self):
1006n/a from socket import inet_aton as f, inet_pton, AF_INET
1007n/a g = lambda a: inet_pton(AF_INET, a)
1008n/a
1009n/a assertInvalid = lambda func,a: self.assertRaises(
1010n/a (OSError, ValueError), func, a
1011n/a )
1012n/a
1013n/a self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
1014n/a self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1015n/a self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1016n/a self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1017n/a self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1018n/a assertInvalid(f, '0.0.0.')
1019n/a assertInvalid(f, '300.0.0.0')
1020n/a assertInvalid(f, 'a.0.0.0')
1021n/a assertInvalid(f, '1.2.3.4.5')
1022n/a assertInvalid(f, '::1')
1023n/a
1024n/a self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1025n/a self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1026n/a self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1027n/a self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1028n/a assertInvalid(g, '0.0.0.')
1029n/a assertInvalid(g, '300.0.0.0')
1030n/a assertInvalid(g, 'a.0.0.0')
1031n/a assertInvalid(g, '1.2.3.4.5')
1032n/a assertInvalid(g, '::1')
1033n/a
1034n/a @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1035n/a 'test needs socket.inet_pton()')
1036n/a def testIPv6toString(self):
1037n/a try:
1038n/a from socket import inet_pton, AF_INET6, has_ipv6
1039n/a if not has_ipv6:
1040n/a self.skipTest('IPv6 not available')
1041n/a except ImportError:
1042n/a self.skipTest('could not import needed symbols from socket')
1043n/a
1044n/a if sys.platform == "win32":
1045n/a try:
1046n/a inet_pton(AF_INET6, '::')
1047n/a except OSError as e:
1048n/a if e.winerror == 10022:
1049n/a self.skipTest('IPv6 might not be supported')
1050n/a
1051n/a f = lambda a: inet_pton(AF_INET6, a)
1052n/a assertInvalid = lambda a: self.assertRaises(
1053n/a (OSError, ValueError), f, a
1054n/a )
1055n/a
1056n/a self.assertEqual(b'\x00' * 16, f('::'))
1057n/a self.assertEqual(b'\x00' * 16, f('0::0'))
1058n/a self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1059n/a self.assertEqual(
1060n/a b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1061n/a f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1062n/a )
1063n/a self.assertEqual(
1064n/a b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1065n/a f('ad42:abc::127:0:254:2')
1066n/a )
1067n/a self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1068n/a assertInvalid('0x20::')
1069n/a assertInvalid(':::')
1070n/a assertInvalid('::0::')
1071n/a assertInvalid('1::abc::')
1072n/a assertInvalid('1::abc::def')
1073n/a assertInvalid('1:2:3:4:5:6:')
1074n/a assertInvalid('1:2:3:4:5:6')
1075n/a assertInvalid('1:2:3:4:5:6:7:8:')
1076n/a assertInvalid('1:2:3:4:5:6:7:8:0')
1077n/a
1078n/a self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1079n/a f('::254.42.23.64')
1080n/a )
1081n/a self.assertEqual(
1082n/a b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1083n/a f('42::a29b:254.42.23.64')
1084n/a )
1085n/a self.assertEqual(
1086n/a b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1087n/a f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1088n/a )
1089n/a assertInvalid('255.254.253.252')
1090n/a assertInvalid('1::260.2.3.0')
1091n/a assertInvalid('1::0.be.e.0')
1092n/a assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1093n/a assertInvalid('::1.2.3.4:0')
1094n/a assertInvalid('0.100.200.0:3:4:5:6:7:8')
1095n/a
1096n/a @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1097n/a 'test needs socket.inet_ntop()')
1098n/a def testStringToIPv4(self):
1099n/a from socket import inet_ntoa as f, inet_ntop, AF_INET
1100n/a g = lambda a: inet_ntop(AF_INET, a)
1101n/a assertInvalid = lambda func,a: self.assertRaises(
1102n/a (OSError, ValueError), func, a
1103n/a )
1104n/a
1105n/a self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1106n/a self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1107n/a self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1108n/a self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1109n/a assertInvalid(f, b'\x00' * 3)
1110n/a assertInvalid(f, b'\x00' * 5)
1111n/a assertInvalid(f, b'\x00' * 16)
1112n/a self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1113n/a
1114n/a self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1115n/a self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1116n/a self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1117n/a assertInvalid(g, b'\x00' * 3)
1118n/a assertInvalid(g, b'\x00' * 5)
1119n/a assertInvalid(g, b'\x00' * 16)
1120n/a self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1121n/a
1122n/a @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1123n/a 'test needs socket.inet_ntop()')
1124n/a def testStringToIPv6(self):
1125n/a try:
1126n/a from socket import inet_ntop, AF_INET6, has_ipv6
1127n/a if not has_ipv6:
1128n/a self.skipTest('IPv6 not available')
1129n/a except ImportError:
1130n/a self.skipTest('could not import needed symbols from socket')
1131n/a
1132n/a if sys.platform == "win32":
1133n/a try:
1134n/a inet_ntop(AF_INET6, b'\x00' * 16)
1135n/a except OSError as e:
1136n/a if e.winerror == 10022:
1137n/a self.skipTest('IPv6 might not be supported')
1138n/a
1139n/a f = lambda a: inet_ntop(AF_INET6, a)
1140n/a assertInvalid = lambda a: self.assertRaises(
1141n/a (OSError, ValueError), f, a
1142n/a )
1143n/a
1144n/a self.assertEqual('::', f(b'\x00' * 16))
1145n/a self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1146n/a self.assertEqual(
1147n/a 'aef:b01:506:1001:ffff:9997:55:170',
1148n/a f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1149n/a )
1150n/a self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1151n/a
1152n/a assertInvalid(b'\x12' * 15)
1153n/a assertInvalid(b'\x12' * 17)
1154n/a assertInvalid(b'\x12' * 4)
1155n/a
1156n/a # XXX The following don't test module-level functionality...
1157n/a
1158n/a def testSockName(self):
1159n/a # Testing getsockname()
1160n/a port = support.find_unused_port()
1161n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1162n/a self.addCleanup(sock.close)
1163n/a sock.bind(("0.0.0.0", port))
1164n/a name = sock.getsockname()
1165n/a # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1166n/a # it reasonable to get the host's addr in addition to 0.0.0.0.
1167n/a # At least for eCos. This is required for the S/390 to pass.
1168n/a try:
1169n/a my_ip_addr = socket.gethostbyname(socket.gethostname())
1170n/a except OSError:
1171n/a # Probably name lookup wasn't set up right; skip this test
1172n/a self.skipTest('name lookup failure')
1173n/a self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1174n/a self.assertEqual(name[1], port)
1175n/a
1176n/a def testGetSockOpt(self):
1177n/a # Testing getsockopt()
1178n/a # We know a socket should start without reuse==0
1179n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1180n/a self.addCleanup(sock.close)
1181n/a reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1182n/a self.assertFalse(reuse != 0, "initial mode is reuse")
1183n/a
1184n/a def testSetSockOpt(self):
1185n/a # Testing setsockopt()
1186n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1187n/a self.addCleanup(sock.close)
1188n/a sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1189n/a reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1190n/a self.assertFalse(reuse == 0, "failed to set reuse mode")
1191n/a
1192n/a def testSendAfterClose(self):
1193n/a # testing send() after close() with timeout
1194n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1195n/a sock.settimeout(1)
1196n/a sock.close()
1197n/a self.assertRaises(OSError, sock.send, b"spam")
1198n/a
1199n/a def testCloseException(self):
1200n/a sock = socket.socket()
1201n/a socket.socket(fileno=sock.fileno()).close()
1202n/a try:
1203n/a sock.close()
1204n/a except OSError as err:
1205n/a # Winsock apparently raises ENOTSOCK
1206n/a self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1207n/a else:
1208n/a self.fail("close() should raise EBADF/ENOTSOCK")
1209n/a
1210n/a def testNewAttributes(self):
1211n/a # testing .family, .type and .protocol
1212n/a
1213n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1214n/a self.assertEqual(sock.family, socket.AF_INET)
1215n/a if hasattr(socket, 'SOCK_CLOEXEC'):
1216n/a self.assertIn(sock.type,
1217n/a (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1218n/a socket.SOCK_STREAM))
1219n/a else:
1220n/a self.assertEqual(sock.type, socket.SOCK_STREAM)
1221n/a self.assertEqual(sock.proto, 0)
1222n/a sock.close()
1223n/a
1224n/a def test_getsockaddrarg(self):
1225n/a sock = socket.socket()
1226n/a self.addCleanup(sock.close)
1227n/a port = support.find_unused_port()
1228n/a big_port = port + 65536
1229n/a neg_port = port - 65536
1230n/a self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1231n/a self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1232n/a # Since find_unused_port() is inherently subject to race conditions, we
1233n/a # call it a couple times if necessary.
1234n/a for i in itertools.count():
1235n/a port = support.find_unused_port()
1236n/a try:
1237n/a sock.bind((HOST, port))
1238n/a except OSError as e:
1239n/a if e.errno != errno.EADDRINUSE or i == 5:
1240n/a raise
1241n/a else:
1242n/a break
1243n/a
1244n/a @unittest.skipUnless(os.name == "nt", "Windows specific")
1245n/a def test_sock_ioctl(self):
1246n/a self.assertTrue(hasattr(socket.socket, 'ioctl'))
1247n/a self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1248n/a self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1249n/a self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1250n/a self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1251n/a s = socket.socket()
1252n/a self.addCleanup(s.close)
1253n/a self.assertRaises(ValueError, s.ioctl, -1, None)
1254n/a s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1255n/a
1256n/a @unittest.skipUnless(os.name == "nt", "Windows specific")
1257n/a @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1258n/a 'Loopback fast path support required for this test')
1259n/a def test_sio_loopback_fast_path(self):
1260n/a s = socket.socket()
1261n/a self.addCleanup(s.close)
1262n/a try:
1263n/a s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1264n/a except OSError as exc:
1265n/a WSAEOPNOTSUPP = 10045
1266n/a if exc.winerror == WSAEOPNOTSUPP:
1267n/a self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1268n/a "doesn't implemented in this Windows version")
1269n/a raise
1270n/a self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1271n/a
1272n/a def testGetaddrinfo(self):
1273n/a try:
1274n/a socket.getaddrinfo('localhost', 80)
1275n/a except socket.gaierror as err:
1276n/a if err.errno == socket.EAI_SERVICE:
1277n/a # see http://bugs.python.org/issue1282647
1278n/a self.skipTest("buggy libc version")
1279n/a raise
1280n/a # len of every sequence is supposed to be == 5
1281n/a for info in socket.getaddrinfo(HOST, None):
1282n/a self.assertEqual(len(info), 5)
1283n/a # host can be a domain name, a string representation of an
1284n/a # IPv4/v6 address or None
1285n/a socket.getaddrinfo('localhost', 80)
1286n/a socket.getaddrinfo('127.0.0.1', 80)
1287n/a socket.getaddrinfo(None, 80)
1288n/a if support.IPV6_ENABLED:
1289n/a socket.getaddrinfo('::1', 80)
1290n/a # port can be a string service name such as "http", a numeric
1291n/a # port number or None
1292n/a # Issue #26936: Android getaddrinfo() was broken before API level 23.
1293n/a if (not hasattr(sys, 'getandroidapilevel') or
1294n/a sys.getandroidapilevel() >= 23):
1295n/a socket.getaddrinfo(HOST, "http")
1296n/a socket.getaddrinfo(HOST, 80)
1297n/a socket.getaddrinfo(HOST, None)
1298n/a # test family and socktype filters
1299n/a infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1300n/a for family, type, _, _, _ in infos:
1301n/a self.assertEqual(family, socket.AF_INET)
1302n/a self.assertEqual(str(family), 'AddressFamily.AF_INET')
1303n/a self.assertEqual(type, socket.SOCK_STREAM)
1304n/a self.assertEqual(str(type), 'SocketKind.SOCK_STREAM')
1305n/a infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1306n/a for _, socktype, _, _, _ in infos:
1307n/a self.assertEqual(socktype, socket.SOCK_STREAM)
1308n/a # test proto and flags arguments
1309n/a socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1310n/a socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1311n/a # a server willing to support both IPv4 and IPv6 will
1312n/a # usually do this
1313n/a socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1314n/a socket.AI_PASSIVE)
1315n/a # test keyword arguments
1316n/a a = socket.getaddrinfo(HOST, None)
1317n/a b = socket.getaddrinfo(host=HOST, port=None)
1318n/a self.assertEqual(a, b)
1319n/a a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1320n/a b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1321n/a self.assertEqual(a, b)
1322n/a a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1323n/a b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1324n/a self.assertEqual(a, b)
1325n/a a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1326n/a b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1327n/a self.assertEqual(a, b)
1328n/a a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1329n/a b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1330n/a self.assertEqual(a, b)
1331n/a a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1332n/a socket.AI_PASSIVE)
1333n/a b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1334n/a type=socket.SOCK_STREAM, proto=0,
1335n/a flags=socket.AI_PASSIVE)
1336n/a self.assertEqual(a, b)
1337n/a # Issue #6697.
1338n/a self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1339n/a
1340n/a # Issue 17269: test workaround for OS X platform bug segfault
1341n/a if hasattr(socket, 'AI_NUMERICSERV'):
1342n/a try:
1343n/a # The arguments here are undefined and the call may succeed
1344n/a # or fail. All we care here is that it doesn't segfault.
1345n/a socket.getaddrinfo("localhost", None, 0, 0, 0,
1346n/a socket.AI_NUMERICSERV)
1347n/a except socket.gaierror:
1348n/a pass
1349n/a
1350n/a def test_getnameinfo(self):
1351n/a # only IP addresses are allowed
1352n/a self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1353n/a
1354n/a @unittest.skipUnless(support.is_resource_enabled('network'),
1355n/a 'network is not enabled')
1356n/a def test_idna(self):
1357n/a # Check for internet access before running test
1358n/a # (issue #12804, issue #25138).
1359n/a with support.transient_internet('python.org'):
1360n/a socket.gethostbyname('python.org')
1361n/a
1362n/a # these should all be successful
1363n/a domain = 'รยธร‘ยรยฟร‘โ€นร‘โ€šรยฐรยฝรยธรยต.pythontest.net'
1364n/a socket.gethostbyname(domain)
1365n/a socket.gethostbyname_ex(domain)
1366n/a socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1367n/a # this may not work if the forward lookup choses the IPv6 address, as that doesn't
1368n/a # have a reverse entry yet
1369n/a # socket.gethostbyaddr('รยธร‘ยรยฟร‘โ€นร‘โ€šรยฐรยฝรยธรยต.python.org')
1370n/a
1371n/a def check_sendall_interrupted(self, with_timeout):
1372n/a # socketpair() is not strictly required, but it makes things easier.
1373n/a if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1374n/a self.skipTest("signal.alarm and socket.socketpair required for this test")
1375n/a # Our signal handlers clobber the C errno by calling a math function
1376n/a # with an invalid domain value.
1377n/a def ok_handler(*args):
1378n/a self.assertRaises(ValueError, math.acosh, 0)
1379n/a def raising_handler(*args):
1380n/a self.assertRaises(ValueError, math.acosh, 0)
1381n/a 1 // 0
1382n/a c, s = socket.socketpair()
1383n/a old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1384n/a try:
1385n/a if with_timeout:
1386n/a # Just above the one second minimum for signal.alarm
1387n/a c.settimeout(1.5)
1388n/a with self.assertRaises(ZeroDivisionError):
1389n/a signal.alarm(1)
1390n/a c.sendall(b"x" * support.SOCK_MAX_SIZE)
1391n/a if with_timeout:
1392n/a signal.signal(signal.SIGALRM, ok_handler)
1393n/a signal.alarm(1)
1394n/a self.assertRaises(socket.timeout, c.sendall,
1395n/a b"x" * support.SOCK_MAX_SIZE)
1396n/a finally:
1397n/a signal.alarm(0)
1398n/a signal.signal(signal.SIGALRM, old_alarm)
1399n/a c.close()
1400n/a s.close()
1401n/a
1402n/a def test_sendall_interrupted(self):
1403n/a self.check_sendall_interrupted(False)
1404n/a
1405n/a def test_sendall_interrupted_with_timeout(self):
1406n/a self.check_sendall_interrupted(True)
1407n/a
1408n/a def test_dealloc_warn(self):
1409n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1410n/a r = repr(sock)
1411n/a with self.assertWarns(ResourceWarning) as cm:
1412n/a sock = None
1413n/a support.gc_collect()
1414n/a self.assertIn(r, str(cm.warning.args[0]))
1415n/a # An open socket file object gets dereferenced after the socket
1416n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1417n/a f = sock.makefile('rb')
1418n/a r = repr(sock)
1419n/a sock = None
1420n/a support.gc_collect()
1421n/a with self.assertWarns(ResourceWarning):
1422n/a f = None
1423n/a support.gc_collect()
1424n/a
1425n/a def test_name_closed_socketio(self):
1426n/a with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1427n/a fp = sock.makefile("rb")
1428n/a fp.close()
1429n/a self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1430n/a
1431n/a def test_unusable_closed_socketio(self):
1432n/a with socket.socket() as sock:
1433n/a fp = sock.makefile("rb", buffering=0)
1434n/a self.assertTrue(fp.readable())
1435n/a self.assertFalse(fp.writable())
1436n/a self.assertFalse(fp.seekable())
1437n/a fp.close()
1438n/a self.assertRaises(ValueError, fp.readable)
1439n/a self.assertRaises(ValueError, fp.writable)
1440n/a self.assertRaises(ValueError, fp.seekable)
1441n/a
1442n/a def test_makefile_mode(self):
1443n/a for mode in 'r', 'rb', 'rw', 'w', 'wb':
1444n/a with self.subTest(mode=mode):
1445n/a with socket.socket() as sock:
1446n/a with sock.makefile(mode) as fp:
1447n/a self.assertEqual(fp.mode, mode)
1448n/a
1449n/a def test_makefile_invalid_mode(self):
1450n/a for mode in 'rt', 'x', '+', 'a':
1451n/a with self.subTest(mode=mode):
1452n/a with socket.socket() as sock:
1453n/a with self.assertRaisesRegex(ValueError, 'invalid mode'):
1454n/a sock.makefile(mode)
1455n/a
1456n/a def test_pickle(self):
1457n/a sock = socket.socket()
1458n/a with sock:
1459n/a for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1460n/a self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1461n/a for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1462n/a family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1463n/a self.assertEqual(family, socket.AF_INET)
1464n/a type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1465n/a self.assertEqual(type, socket.SOCK_STREAM)
1466n/a
1467n/a def test_listen_backlog(self):
1468n/a for backlog in 0, -1:
1469n/a with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1470n/a srv.bind((HOST, 0))
1471n/a srv.listen(backlog)
1472n/a
1473n/a with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1474n/a srv.bind((HOST, 0))
1475n/a srv.listen()
1476n/a
1477n/a @support.cpython_only
1478n/a def test_listen_backlog_overflow(self):
1479n/a # Issue 15989
1480n/a import _testcapi
1481n/a srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1482n/a srv.bind((HOST, 0))
1483n/a self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1484n/a srv.close()
1485n/a
1486n/a @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
1487n/a def test_flowinfo(self):
1488n/a self.assertRaises(OverflowError, socket.getnameinfo,
1489n/a (support.HOSTv6, 0, 0xffffffff), 0)
1490n/a with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1491n/a self.assertRaises(OverflowError, s.bind, (support.HOSTv6, 0, -10))
1492n/a
1493n/a def test_str_for_enums(self):
1494n/a # Make sure that the AF_* and SOCK_* constants have enum-like string
1495n/a # reprs.
1496n/a with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1497n/a self.assertEqual(str(s.family), 'AddressFamily.AF_INET')
1498n/a self.assertEqual(str(s.type), 'SocketKind.SOCK_STREAM')
1499n/a
1500n/a @unittest.skipIf(os.name == 'nt', 'Will not work on Windows')
1501n/a def test_uknown_socket_family_repr(self):
1502n/a # Test that when created with a family that's not one of the known
1503n/a # AF_*/SOCK_* constants, socket.family just returns the number.
1504n/a #
1505n/a # To do this we fool socket.socket into believing it already has an
1506n/a # open fd because on this path it doesn't actually verify the family and
1507n/a # type and populates the socket object.
1508n/a #
1509n/a # On Windows this trick won't work, so the test is skipped.
1510n/a fd, path = tempfile.mkstemp()
1511n/a self.addCleanup(os.unlink, path)
1512n/a with socket.socket(family=42424, type=13331, fileno=fd) as s:
1513n/a self.assertEqual(s.family, 42424)
1514n/a self.assertEqual(s.type, 13331)
1515n/a
1516n/a @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1517n/a def test__sendfile_use_sendfile(self):
1518n/a class File:
1519n/a def __init__(self, fd):
1520n/a self.fd = fd
1521n/a
1522n/a def fileno(self):
1523n/a return self.fd
1524n/a with socket.socket() as sock:
1525n/a fd = os.open(os.curdir, os.O_RDONLY)
1526n/a os.close(fd)
1527n/a with self.assertRaises(socket._GiveupOnSendfile):
1528n/a sock._sendfile_use_sendfile(File(fd))
1529n/a with self.assertRaises(OverflowError):
1530n/a sock._sendfile_use_sendfile(File(2**1000))
1531n/a with self.assertRaises(TypeError):
1532n/a sock._sendfile_use_sendfile(File(None))
1533n/a
1534n/a
1535n/a@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1536n/aclass BasicCANTest(unittest.TestCase):
1537n/a
1538n/a def testCrucialConstants(self):
1539n/a socket.AF_CAN
1540n/a socket.PF_CAN
1541n/a socket.CAN_RAW
1542n/a
1543n/a @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1544n/a 'socket.CAN_BCM required for this test.')
1545n/a def testBCMConstants(self):
1546n/a socket.CAN_BCM
1547n/a
1548n/a # opcodes
1549n/a socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task
1550n/a socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task
1551n/a socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task
1552n/a socket.CAN_BCM_TX_SEND # send one CAN frame
1553n/a socket.CAN_BCM_RX_SETUP # create RX content filter subscription
1554n/a socket.CAN_BCM_RX_DELETE # remove RX content filter subscription
1555n/a socket.CAN_BCM_RX_READ # read properties of RX content filter subscription
1556n/a socket.CAN_BCM_TX_STATUS # reply to TX_READ request
1557n/a socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0)
1558n/a socket.CAN_BCM_RX_STATUS # reply to RX_READ request
1559n/a socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent
1560n/a socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change)
1561n/a
1562n/a def testCreateSocket(self):
1563n/a with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1564n/a pass
1565n/a
1566n/a @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1567n/a 'socket.CAN_BCM required for this test.')
1568n/a def testCreateBCMSocket(self):
1569n/a with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
1570n/a pass
1571n/a
1572n/a def testBindAny(self):
1573n/a with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1574n/a s.bind(('', ))
1575n/a
1576n/a def testTooLongInterfaceName(self):
1577n/a # most systems limit IFNAMSIZ to 16, take 1024 to be sure
1578n/a with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1579n/a self.assertRaisesRegex(OSError, 'interface name too long',
1580n/a s.bind, ('x' * 1024,))
1581n/a
1582n/a @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
1583n/a 'socket.CAN_RAW_LOOPBACK required for this test.')
1584n/a def testLoopback(self):
1585n/a with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1586n/a for loopback in (0, 1):
1587n/a s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
1588n/a loopback)
1589n/a self.assertEqual(loopback,
1590n/a s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
1591n/a
1592n/a @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
1593n/a 'socket.CAN_RAW_FILTER required for this test.')
1594n/a def testFilter(self):
1595n/a can_id, can_mask = 0x200, 0x700
1596n/a can_filter = struct.pack("=II", can_id, can_mask)
1597n/a with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
1598n/a s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
1599n/a self.assertEqual(can_filter,
1600n/a s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
1601n/a s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
1602n/a
1603n/a
1604n/a@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
1605n/a@unittest.skipUnless(thread, 'Threading required for this test.')
1606n/aclass CANTest(ThreadedCANSocketTest):
1607n/a
1608n/a def __init__(self, methodName='runTest'):
1609n/a ThreadedCANSocketTest.__init__(self, methodName=methodName)
1610n/a
1611n/a @classmethod
1612n/a def build_can_frame(cls, can_id, data):
1613n/a """Build a CAN frame."""
1614n/a can_dlc = len(data)
1615n/a data = data.ljust(8, b'\x00')
1616n/a return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
1617n/a
1618n/a @classmethod
1619n/a def dissect_can_frame(cls, frame):
1620n/a """Dissect a CAN frame."""
1621n/a can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
1622n/a return (can_id, can_dlc, data[:can_dlc])
1623n/a
1624n/a def testSendFrame(self):
1625n/a cf, addr = self.s.recvfrom(self.bufsize)
1626n/a self.assertEqual(self.cf, cf)
1627n/a self.assertEqual(addr[0], self.interface)
1628n/a self.assertEqual(addr[1], socket.AF_CAN)
1629n/a
1630n/a def _testSendFrame(self):
1631n/a self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
1632n/a self.cli.send(self.cf)
1633n/a
1634n/a def testSendMaxFrame(self):
1635n/a cf, addr = self.s.recvfrom(self.bufsize)
1636n/a self.assertEqual(self.cf, cf)
1637n/a
1638n/a def _testSendMaxFrame(self):
1639n/a self.cf = self.build_can_frame(0x00, b'\x07' * 8)
1640n/a self.cli.send(self.cf)
1641n/a
1642n/a def testSendMultiFrames(self):
1643n/a cf, addr = self.s.recvfrom(self.bufsize)
1644n/a self.assertEqual(self.cf1, cf)
1645n/a
1646n/a cf, addr = self.s.recvfrom(self.bufsize)
1647n/a self.assertEqual(self.cf2, cf)
1648n/a
1649n/a def _testSendMultiFrames(self):
1650n/a self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
1651n/a self.cli.send(self.cf1)
1652n/a
1653n/a self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
1654n/a self.cli.send(self.cf2)
1655n/a
1656n/a @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1657n/a 'socket.CAN_BCM required for this test.')
1658n/a def _testBCM(self):
1659n/a cf, addr = self.cli.recvfrom(self.bufsize)
1660n/a self.assertEqual(self.cf, cf)
1661n/a can_id, can_dlc, data = self.dissect_can_frame(cf)
1662n/a self.assertEqual(self.can_id, can_id)
1663n/a self.assertEqual(self.data, data)
1664n/a
1665n/a @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
1666n/a 'socket.CAN_BCM required for this test.')
1667n/a def testBCM(self):
1668n/a bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
1669n/a self.addCleanup(bcm.close)
1670n/a bcm.connect((self.interface,))
1671n/a self.can_id = 0x123
1672n/a self.data = bytes([0xc0, 0xff, 0xee])
1673n/a self.cf = self.build_can_frame(self.can_id, self.data)
1674n/a opcode = socket.CAN_BCM_TX_SEND
1675n/a flags = 0
1676n/a count = 0
1677n/a ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
1678n/a bcm_can_id = 0x0222
1679n/a nframes = 1
1680n/a assert len(self.cf) == 16
1681n/a header = struct.pack(self.bcm_cmd_msg_fmt,
1682n/a opcode,
1683n/a flags,
1684n/a count,
1685n/a ival1_seconds,
1686n/a ival1_usec,
1687n/a ival2_seconds,
1688n/a ival2_usec,
1689n/a bcm_can_id,
1690n/a nframes,
1691n/a )
1692n/a header_plus_frame = header + self.cf
1693n/a bytes_sent = bcm.send(header_plus_frame)
1694n/a self.assertEqual(bytes_sent, len(header_plus_frame))
1695n/a
1696n/a
1697n/a@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
1698n/aclass BasicRDSTest(unittest.TestCase):
1699n/a
1700n/a def testCrucialConstants(self):
1701n/a socket.AF_RDS
1702n/a socket.PF_RDS
1703n/a
1704n/a def testCreateSocket(self):
1705n/a with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
1706n/a pass
1707n/a
1708n/a def testSocketBufferSize(self):
1709n/a bufsize = 16384
1710n/a with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
1711n/a s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
1712n/a s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
1713n/a
1714n/a
1715n/a@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
1716n/a@unittest.skipUnless(thread, 'Threading required for this test.')
1717n/aclass RDSTest(ThreadedRDSSocketTest):
1718n/a
1719n/a def __init__(self, methodName='runTest'):
1720n/a ThreadedRDSSocketTest.__init__(self, methodName=methodName)
1721n/a
1722n/a def setUp(self):
1723n/a super().setUp()
1724n/a self.evt = threading.Event()
1725n/a
1726n/a def testSendAndRecv(self):
1727n/a data, addr = self.serv.recvfrom(self.bufsize)
1728n/a self.assertEqual(self.data, data)
1729n/a self.assertEqual(self.cli_addr, addr)
1730n/a
1731n/a def _testSendAndRecv(self):
1732n/a self.data = b'spam'
1733n/a self.cli.sendto(self.data, 0, (HOST, self.port))
1734n/a
1735n/a def testPeek(self):
1736n/a data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
1737n/a self.assertEqual(self.data, data)
1738n/a data, addr = self.serv.recvfrom(self.bufsize)
1739n/a self.assertEqual(self.data, data)
1740n/a
1741n/a def _testPeek(self):
1742n/a self.data = b'spam'
1743n/a self.cli.sendto(self.data, 0, (HOST, self.port))
1744n/a
1745n/a @requireAttrs(socket.socket, 'recvmsg')
1746n/a def testSendAndRecvMsg(self):
1747n/a data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
1748n/a self.assertEqual(self.data, data)
1749n/a
1750n/a @requireAttrs(socket.socket, 'sendmsg')
1751n/a def _testSendAndRecvMsg(self):
1752n/a self.data = b'hello ' * 10
1753n/a self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
1754n/a
1755n/a def testSendAndRecvMulti(self):
1756n/a data, addr = self.serv.recvfrom(self.bufsize)
1757n/a self.assertEqual(self.data1, data)
1758n/a
1759n/a data, addr = self.serv.recvfrom(self.bufsize)
1760n/a self.assertEqual(self.data2, data)
1761n/a
1762n/a def _testSendAndRecvMulti(self):
1763n/a self.data1 = b'bacon'
1764n/a self.cli.sendto(self.data1, 0, (HOST, self.port))
1765n/a
1766n/a self.data2 = b'egg'
1767n/a self.cli.sendto(self.data2, 0, (HOST, self.port))
1768n/a
1769n/a def testSelect(self):
1770n/a r, w, x = select.select([self.serv], [], [], 3.0)
1771n/a self.assertIn(self.serv, r)
1772n/a data, addr = self.serv.recvfrom(self.bufsize)
1773n/a self.assertEqual(self.data, data)
1774n/a
1775n/a def _testSelect(self):
1776n/a self.data = b'select'
1777n/a self.cli.sendto(self.data, 0, (HOST, self.port))
1778n/a
1779n/a def testCongestion(self):
1780n/a # wait until the sender is done
1781n/a self.evt.wait()
1782n/a
1783n/a def _testCongestion(self):
1784n/a # test the behavior in case of congestion
1785n/a self.data = b'fill'
1786n/a self.cli.setblocking(False)
1787n/a try:
1788n/a # try to lower the receiver's socket buffer size
1789n/a self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
1790n/a except OSError:
1791n/a pass
1792n/a with self.assertRaises(OSError) as cm:
1793n/a try:
1794n/a # fill the receiver's socket buffer
1795n/a while True:
1796n/a self.cli.sendto(self.data, 0, (HOST, self.port))
1797n/a finally:
1798n/a # signal the receiver we're done
1799n/a self.evt.set()
1800n/a # sendto() should have failed with ENOBUFS
1801n/a self.assertEqual(cm.exception.errno, errno.ENOBUFS)
1802n/a # and we should have received a congestion notification through poll
1803n/a r, w, x = select.select([self.serv], [], [], 3.0)
1804n/a self.assertIn(self.serv, r)
1805n/a
1806n/a
1807n/a@unittest.skipUnless(thread, 'Threading required for this test.')
1808n/aclass BasicTCPTest(SocketConnectedTest):
1809n/a
1810n/a def __init__(self, methodName='runTest'):
1811n/a SocketConnectedTest.__init__(self, methodName=methodName)
1812n/a
1813n/a def testRecv(self):
1814n/a # Testing large receive over TCP
1815n/a msg = self.cli_conn.recv(1024)
1816n/a self.assertEqual(msg, MSG)
1817n/a
1818n/a def _testRecv(self):
1819n/a self.serv_conn.send(MSG)
1820n/a
1821n/a def testOverFlowRecv(self):
1822n/a # Testing receive in chunks over TCP
1823n/a seg1 = self.cli_conn.recv(len(MSG) - 3)
1824n/a seg2 = self.cli_conn.recv(1024)
1825n/a msg = seg1 + seg2
1826n/a self.assertEqual(msg, MSG)
1827n/a
1828n/a def _testOverFlowRecv(self):
1829n/a self.serv_conn.send(MSG)
1830n/a
1831n/a def testRecvFrom(self):
1832n/a # Testing large recvfrom() over TCP
1833n/a msg, addr = self.cli_conn.recvfrom(1024)
1834n/a self.assertEqual(msg, MSG)
1835n/a
1836n/a def _testRecvFrom(self):
1837n/a self.serv_conn.send(MSG)
1838n/a
1839n/a def testOverFlowRecvFrom(self):
1840n/a # Testing recvfrom() in chunks over TCP
1841n/a seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
1842n/a seg2, addr = self.cli_conn.recvfrom(1024)
1843n/a msg = seg1 + seg2
1844n/a self.assertEqual(msg, MSG)
1845n/a
1846n/a def _testOverFlowRecvFrom(self):
1847n/a self.serv_conn.send(MSG)
1848n/a
1849n/a def testSendAll(self):
1850n/a # Testing sendall() with a 2048 byte string over TCP
1851n/a msg = b''
1852n/a while 1:
1853n/a read = self.cli_conn.recv(1024)
1854n/a if not read:
1855n/a break
1856n/a msg += read
1857n/a self.assertEqual(msg, b'f' * 2048)
1858n/a
1859n/a def _testSendAll(self):
1860n/a big_chunk = b'f' * 2048
1861n/a self.serv_conn.sendall(big_chunk)
1862n/a
1863n/a def testFromFd(self):
1864n/a # Testing fromfd()
1865n/a fd = self.cli_conn.fileno()
1866n/a sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
1867n/a self.addCleanup(sock.close)
1868n/a self.assertIsInstance(sock, socket.socket)
1869n/a msg = sock.recv(1024)
1870n/a self.assertEqual(msg, MSG)
1871n/a
1872n/a def _testFromFd(self):
1873n/a self.serv_conn.send(MSG)
1874n/a
1875n/a def testDup(self):
1876n/a # Testing dup()
1877n/a sock = self.cli_conn.dup()
1878n/a self.addCleanup(sock.close)
1879n/a msg = sock.recv(1024)
1880n/a self.assertEqual(msg, MSG)
1881n/a
1882n/a def _testDup(self):
1883n/a self.serv_conn.send(MSG)
1884n/a
1885n/a def testShutdown(self):
1886n/a # Testing shutdown()
1887n/a msg = self.cli_conn.recv(1024)
1888n/a self.assertEqual(msg, MSG)
1889n/a # wait for _testShutdown to finish: on OS X, when the server
1890n/a # closes the connection the client also becomes disconnected,
1891n/a # and the client's shutdown call will fail. (Issue #4397.)
1892n/a self.done.wait()
1893n/a
1894n/a def _testShutdown(self):
1895n/a self.serv_conn.send(MSG)
1896n/a self.serv_conn.shutdown(2)
1897n/a
1898n/a testShutdown_overflow = support.cpython_only(testShutdown)
1899n/a
1900n/a @support.cpython_only
1901n/a def _testShutdown_overflow(self):
1902n/a import _testcapi
1903n/a self.serv_conn.send(MSG)
1904n/a # Issue 15989
1905n/a self.assertRaises(OverflowError, self.serv_conn.shutdown,
1906n/a _testcapi.INT_MAX + 1)
1907n/a self.assertRaises(OverflowError, self.serv_conn.shutdown,
1908n/a 2 + (_testcapi.UINT_MAX + 1))
1909n/a self.serv_conn.shutdown(2)
1910n/a
1911n/a def testDetach(self):
1912n/a # Testing detach()
1913n/a fileno = self.cli_conn.fileno()
1914n/a f = self.cli_conn.detach()
1915n/a self.assertEqual(f, fileno)
1916n/a # cli_conn cannot be used anymore...
1917n/a self.assertTrue(self.cli_conn._closed)
1918n/a self.assertRaises(OSError, self.cli_conn.recv, 1024)
1919n/a self.cli_conn.close()
1920n/a # ...but we can create another socket using the (still open)
1921n/a # file descriptor
1922n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
1923n/a self.addCleanup(sock.close)
1924n/a msg = sock.recv(1024)
1925n/a self.assertEqual(msg, MSG)
1926n/a
1927n/a def _testDetach(self):
1928n/a self.serv_conn.send(MSG)
1929n/a
1930n/a@unittest.skipUnless(thread, 'Threading required for this test.')
1931n/aclass BasicUDPTest(ThreadedUDPSocketTest):
1932n/a
1933n/a def __init__(self, methodName='runTest'):
1934n/a ThreadedUDPSocketTest.__init__(self, methodName=methodName)
1935n/a
1936n/a def testSendtoAndRecv(self):
1937n/a # Testing sendto() and Recv() over UDP
1938n/a msg = self.serv.recv(len(MSG))
1939n/a self.assertEqual(msg, MSG)
1940n/a
1941n/a def _testSendtoAndRecv(self):
1942n/a self.cli.sendto(MSG, 0, (HOST, self.port))
1943n/a
1944n/a def testRecvFrom(self):
1945n/a # Testing recvfrom() over UDP
1946n/a msg, addr = self.serv.recvfrom(len(MSG))
1947n/a self.assertEqual(msg, MSG)
1948n/a
1949n/a def _testRecvFrom(self):
1950n/a self.cli.sendto(MSG, 0, (HOST, self.port))
1951n/a
1952n/a def testRecvFromNegative(self):
1953n/a # Negative lengths passed to recvfrom should give ValueError.
1954n/a self.assertRaises(ValueError, self.serv.recvfrom, -1)
1955n/a
1956n/a def _testRecvFromNegative(self):
1957n/a self.cli.sendto(MSG, 0, (HOST, self.port))
1958n/a
1959n/a# Tests for the sendmsg()/recvmsg() interface. Where possible, the
1960n/a# same test code is used with different families and types of socket
1961n/a# (e.g. stream, datagram), and tests using recvmsg() are repeated
1962n/a# using recvmsg_into().
1963n/a#
1964n/a# The generic test classes such as SendmsgTests and
1965n/a# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
1966n/a# supplied with sockets cli_sock and serv_sock representing the
1967n/a# client's and the server's end of the connection respectively, and
1968n/a# attributes cli_addr and serv_addr holding their (numeric where
1969n/a# appropriate) addresses.
1970n/a#
1971n/a# The final concrete test classes combine these with subclasses of
1972n/a# SocketTestBase which set up client and server sockets of a specific
1973n/a# type, and with subclasses of SendrecvmsgBase such as
1974n/a# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
1975n/a# sockets to cli_sock and serv_sock and override the methods and
1976n/a# attributes of SendrecvmsgBase to fill in destination addresses if
1977n/a# needed when sending, check for specific flags in msg_flags, etc.
1978n/a#
1979n/a# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
1980n/a# recvmsg_into().
1981n/a
1982n/a# XXX: like the other datagram (UDP) tests in this module, the code
1983n/a# here assumes that datagram delivery on the local machine will be
1984n/a# reliable.
1985n/a
1986n/aclass SendrecvmsgBase(ThreadSafeCleanupTestCase):
1987n/a # Base class for sendmsg()/recvmsg() tests.
1988n/a
1989n/a # Time in seconds to wait before considering a test failed, or
1990n/a # None for no timeout. Not all tests actually set a timeout.
1991n/a fail_timeout = 3.0
1992n/a
1993n/a def setUp(self):
1994n/a self.misc_event = threading.Event()
1995n/a super().setUp()
1996n/a
1997n/a def sendToServer(self, msg):
1998n/a # Send msg to the server.
1999n/a return self.cli_sock.send(msg)
2000n/a
2001n/a # Tuple of alternative default arguments for sendmsg() when called
2002n/a # via sendmsgToServer() (e.g. to include a destination address).
2003n/a sendmsg_to_server_defaults = ()
2004n/a
2005n/a def sendmsgToServer(self, *args):
2006n/a # Call sendmsg() on self.cli_sock with the given arguments,
2007n/a # filling in any arguments which are not supplied with the
2008n/a # corresponding items of self.sendmsg_to_server_defaults, if
2009n/a # any.
2010n/a return self.cli_sock.sendmsg(
2011n/a *(args + self.sendmsg_to_server_defaults[len(args):]))
2012n/a
2013n/a def doRecvmsg(self, sock, bufsize, *args):
2014n/a # Call recvmsg() on sock with given arguments and return its
2015n/a # result. Should be used for tests which can use either
2016n/a # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
2017n/a # this method with one which emulates it using recvmsg_into(),
2018n/a # thus allowing the same test to be used for both methods.
2019n/a result = sock.recvmsg(bufsize, *args)
2020n/a self.registerRecvmsgResult(result)
2021n/a return result
2022n/a
2023n/a def registerRecvmsgResult(self, result):
2024n/a # Called by doRecvmsg() with the return value of recvmsg() or
2025n/a # recvmsg_into(). Can be overridden to arrange cleanup based
2026n/a # on the returned ancillary data, for instance.
2027n/a pass
2028n/a
2029n/a def checkRecvmsgAddress(self, addr1, addr2):
2030n/a # Called to compare the received address with the address of
2031n/a # the peer.
2032n/a self.assertEqual(addr1, addr2)
2033n/a
2034n/a # Flags that are normally unset in msg_flags
2035n/a msg_flags_common_unset = 0
2036n/a for name in ("MSG_CTRUNC", "MSG_OOB"):
2037n/a msg_flags_common_unset |= getattr(socket, name, 0)
2038n/a
2039n/a # Flags that are normally set
2040n/a msg_flags_common_set = 0
2041n/a
2042n/a # Flags set when a complete record has been received (e.g. MSG_EOR
2043n/a # for SCTP)
2044n/a msg_flags_eor_indicator = 0
2045n/a
2046n/a # Flags set when a complete record has not been received
2047n/a # (e.g. MSG_TRUNC for datagram sockets)
2048n/a msg_flags_non_eor_indicator = 0
2049n/a
2050n/a def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2051n/a # Method to check the value of msg_flags returned by recvmsg[_into]().
2052n/a #
2053n/a # Checks that all bits in msg_flags_common_set attribute are
2054n/a # set in "flags" and all bits in msg_flags_common_unset are
2055n/a # unset.
2056n/a #
2057n/a # The "eor" argument specifies whether the flags should
2058n/a # indicate that a full record (or datagram) has been received.
2059n/a # If "eor" is None, no checks are done; otherwise, checks
2060n/a # that:
2061n/a #
2062n/a # * if "eor" is true, all bits in msg_flags_eor_indicator are
2063n/a # set and all bits in msg_flags_non_eor_indicator are unset
2064n/a #
2065n/a # * if "eor" is false, all bits in msg_flags_non_eor_indicator
2066n/a # are set and all bits in msg_flags_eor_indicator are unset
2067n/a #
2068n/a # If "checkset" and/or "checkunset" are supplied, they require
2069n/a # the given bits to be set or unset respectively, overriding
2070n/a # what the attributes require for those bits.
2071n/a #
2072n/a # If any bits are set in "ignore", they will not be checked,
2073n/a # regardless of the other inputs.
2074n/a #
2075n/a # Will raise Exception if the inputs require a bit to be both
2076n/a # set and unset, and it is not ignored.
2077n/a
2078n/a defaultset = self.msg_flags_common_set
2079n/a defaultunset = self.msg_flags_common_unset
2080n/a
2081n/a if eor:
2082n/a defaultset |= self.msg_flags_eor_indicator
2083n/a defaultunset |= self.msg_flags_non_eor_indicator
2084n/a elif eor is not None:
2085n/a defaultset |= self.msg_flags_non_eor_indicator
2086n/a defaultunset |= self.msg_flags_eor_indicator
2087n/a
2088n/a # Function arguments override defaults
2089n/a defaultset &= ~checkunset
2090n/a defaultunset &= ~checkset
2091n/a
2092n/a # Merge arguments with remaining defaults, and check for conflicts
2093n/a checkset |= defaultset
2094n/a checkunset |= defaultunset
2095n/a inboth = checkset & checkunset & ~ignore
2096n/a if inboth:
2097n/a raise Exception("contradictory set, unset requirements for flags "
2098n/a "{0:#x}".format(inboth))
2099n/a
2100n/a # Compare with given msg_flags value
2101n/a mask = (checkset | checkunset) & ~ignore
2102n/a self.assertEqual(flags & mask, checkset & mask)
2103n/a
2104n/a
2105n/aclass RecvmsgIntoMixin(SendrecvmsgBase):
2106n/a # Mixin to implement doRecvmsg() using recvmsg_into().
2107n/a
2108n/a def doRecvmsg(self, sock, bufsize, *args):
2109n/a buf = bytearray(bufsize)
2110n/a result = sock.recvmsg_into([buf], *args)
2111n/a self.registerRecvmsgResult(result)
2112n/a self.assertGreaterEqual(result[0], 0)
2113n/a self.assertLessEqual(result[0], bufsize)
2114n/a return (bytes(buf[:result[0]]),) + result[1:]
2115n/a
2116n/a
2117n/aclass SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2118n/a # Defines flags to be checked in msg_flags for datagram sockets.
2119n/a
2120n/a @property
2121n/a def msg_flags_non_eor_indicator(self):
2122n/a return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2123n/a
2124n/a
2125n/aclass SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2126n/a # Defines flags to be checked in msg_flags for SCTP sockets.
2127n/a
2128n/a @property
2129n/a def msg_flags_eor_indicator(self):
2130n/a return super().msg_flags_eor_indicator | socket.MSG_EOR
2131n/a
2132n/a
2133n/aclass SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2134n/a # Base class for tests on connectionless-mode sockets. Users must
2135n/a # supply sockets on attributes cli and serv to be mapped to
2136n/a # cli_sock and serv_sock respectively.
2137n/a
2138n/a @property
2139n/a def serv_sock(self):
2140n/a return self.serv
2141n/a
2142n/a @property
2143n/a def cli_sock(self):
2144n/a return self.cli
2145n/a
2146n/a @property
2147n/a def sendmsg_to_server_defaults(self):
2148n/a return ([], [], 0, self.serv_addr)
2149n/a
2150n/a def sendToServer(self, msg):
2151n/a return self.cli_sock.sendto(msg, self.serv_addr)
2152n/a
2153n/a
2154n/aclass SendrecvmsgConnectedBase(SendrecvmsgBase):
2155n/a # Base class for tests on connected sockets. Users must supply
2156n/a # sockets on attributes serv_conn and cli_conn (representing the
2157n/a # connections *to* the server and the client), to be mapped to
2158n/a # cli_sock and serv_sock respectively.
2159n/a
2160n/a @property
2161n/a def serv_sock(self):
2162n/a return self.cli_conn
2163n/a
2164n/a @property
2165n/a def cli_sock(self):
2166n/a return self.serv_conn
2167n/a
2168n/a def checkRecvmsgAddress(self, addr1, addr2):
2169n/a # Address is currently "unspecified" for a connected socket,
2170n/a # so we don't examine it
2171n/a pass
2172n/a
2173n/a
2174n/aclass SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2175n/a # Base class to set a timeout on server's socket.
2176n/a
2177n/a def setUp(self):
2178n/a super().setUp()
2179n/a self.serv_sock.settimeout(self.fail_timeout)
2180n/a
2181n/a
2182n/aclass SendmsgTests(SendrecvmsgServerTimeoutBase):
2183n/a # Tests for sendmsg() which can use any socket type and do not
2184n/a # involve recvmsg() or recvmsg_into().
2185n/a
2186n/a def testSendmsg(self):
2187n/a # Send a simple message with sendmsg().
2188n/a self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2189n/a
2190n/a def _testSendmsg(self):
2191n/a self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2192n/a
2193n/a def testSendmsgDataGenerator(self):
2194n/a # Send from buffer obtained from a generator (not a sequence).
2195n/a self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2196n/a
2197n/a def _testSendmsgDataGenerator(self):
2198n/a self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2199n/a len(MSG))
2200n/a
2201n/a def testSendmsgAncillaryGenerator(self):
2202n/a # Gather (empty) ancillary data from a generator.
2203n/a self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2204n/a
2205n/a def _testSendmsgAncillaryGenerator(self):
2206n/a self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2207n/a len(MSG))
2208n/a
2209n/a def testSendmsgArray(self):
2210n/a # Send data from an array instead of the usual bytes object.
2211n/a self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2212n/a
2213n/a def _testSendmsgArray(self):
2214n/a self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2215n/a len(MSG))
2216n/a
2217n/a def testSendmsgGather(self):
2218n/a # Send message data from more than one buffer (gather write).
2219n/a self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2220n/a
2221n/a def _testSendmsgGather(self):
2222n/a self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2223n/a
2224n/a def testSendmsgBadArgs(self):
2225n/a # Check that sendmsg() rejects invalid arguments.
2226n/a self.assertEqual(self.serv_sock.recv(1000), b"done")
2227n/a
2228n/a def _testSendmsgBadArgs(self):
2229n/a self.assertRaises(TypeError, self.cli_sock.sendmsg)
2230n/a self.assertRaises(TypeError, self.sendmsgToServer,
2231n/a b"not in an iterable")
2232n/a self.assertRaises(TypeError, self.sendmsgToServer,
2233n/a object())
2234n/a self.assertRaises(TypeError, self.sendmsgToServer,
2235n/a [object()])
2236n/a self.assertRaises(TypeError, self.sendmsgToServer,
2237n/a [MSG, object()])
2238n/a self.assertRaises(TypeError, self.sendmsgToServer,
2239n/a [MSG], object())
2240n/a self.assertRaises(TypeError, self.sendmsgToServer,
2241n/a [MSG], [], object())
2242n/a self.assertRaises(TypeError, self.sendmsgToServer,
2243n/a [MSG], [], 0, object())
2244n/a self.sendToServer(b"done")
2245n/a
2246n/a def testSendmsgBadCmsg(self):
2247n/a # Check that invalid ancillary data items are rejected.
2248n/a self.assertEqual(self.serv_sock.recv(1000), b"done")
2249n/a
2250n/a def _testSendmsgBadCmsg(self):
2251n/a self.assertRaises(TypeError, self.sendmsgToServer,
2252n/a [MSG], [object()])
2253n/a self.assertRaises(TypeError, self.sendmsgToServer,
2254n/a [MSG], [(object(), 0, b"data")])
2255n/a self.assertRaises(TypeError, self.sendmsgToServer,
2256n/a [MSG], [(0, object(), b"data")])
2257n/a self.assertRaises(TypeError, self.sendmsgToServer,
2258n/a [MSG], [(0, 0, object())])
2259n/a self.assertRaises(TypeError, self.sendmsgToServer,
2260n/a [MSG], [(0, 0)])
2261n/a self.assertRaises(TypeError, self.sendmsgToServer,
2262n/a [MSG], [(0, 0, b"data", 42)])
2263n/a self.sendToServer(b"done")
2264n/a
2265n/a @requireAttrs(socket, "CMSG_SPACE")
2266n/a def testSendmsgBadMultiCmsg(self):
2267n/a # Check that invalid ancillary data items are rejected when
2268n/a # more than one item is present.
2269n/a self.assertEqual(self.serv_sock.recv(1000), b"done")
2270n/a
2271n/a @testSendmsgBadMultiCmsg.client_skip
2272n/a def _testSendmsgBadMultiCmsg(self):
2273n/a self.assertRaises(TypeError, self.sendmsgToServer,
2274n/a [MSG], [0, 0, b""])
2275n/a self.assertRaises(TypeError, self.sendmsgToServer,
2276n/a [MSG], [(0, 0, b""), object()])
2277n/a self.sendToServer(b"done")
2278n/a
2279n/a def testSendmsgExcessCmsgReject(self):
2280n/a # Check that sendmsg() rejects excess ancillary data items
2281n/a # when the number that can be sent is limited.
2282n/a self.assertEqual(self.serv_sock.recv(1000), b"done")
2283n/a
2284n/a def _testSendmsgExcessCmsgReject(self):
2285n/a if not hasattr(socket, "CMSG_SPACE"):
2286n/a # Can only send one item
2287n/a with self.assertRaises(OSError) as cm:
2288n/a self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
2289n/a self.assertIsNone(cm.exception.errno)
2290n/a self.sendToServer(b"done")
2291n/a
2292n/a def testSendmsgAfterClose(self):
2293n/a # Check that sendmsg() fails on a closed socket.
2294n/a pass
2295n/a
2296n/a def _testSendmsgAfterClose(self):
2297n/a self.cli_sock.close()
2298n/a self.assertRaises(OSError, self.sendmsgToServer, [MSG])
2299n/a
2300n/a
2301n/aclass SendmsgStreamTests(SendmsgTests):
2302n/a # Tests for sendmsg() which require a stream socket and do not
2303n/a # involve recvmsg() or recvmsg_into().
2304n/a
2305n/a def testSendmsgExplicitNoneAddr(self):
2306n/a # Check that peer address can be specified as None.
2307n/a self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2308n/a
2309n/a def _testSendmsgExplicitNoneAddr(self):
2310n/a self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
2311n/a
2312n/a def testSendmsgTimeout(self):
2313n/a # Check that timeout works with sendmsg().
2314n/a self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2315n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2316n/a
2317n/a def _testSendmsgTimeout(self):
2318n/a try:
2319n/a self.cli_sock.settimeout(0.03)
2320n/a with self.assertRaises(socket.timeout):
2321n/a while True:
2322n/a self.sendmsgToServer([b"a"*512])
2323n/a finally:
2324n/a self.misc_event.set()
2325n/a
2326n/a # XXX: would be nice to have more tests for sendmsg flags argument.
2327n/a
2328n/a # Linux supports MSG_DONTWAIT when sending, but in general, it
2329n/a # only works when receiving. Could add other platforms if they
2330n/a # support it too.
2331n/a @skipWithClientIf(sys.platform not in {"linux"},
2332n/a "MSG_DONTWAIT not known to work on this platform when "
2333n/a "sending")
2334n/a def testSendmsgDontWait(self):
2335n/a # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
2336n/a self.assertEqual(self.serv_sock.recv(512), b"a"*512)
2337n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2338n/a
2339n/a @testSendmsgDontWait.client_skip
2340n/a def _testSendmsgDontWait(self):
2341n/a try:
2342n/a with self.assertRaises(OSError) as cm:
2343n/a while True:
2344n/a self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
2345n/a self.assertIn(cm.exception.errno,
2346n/a (errno.EAGAIN, errno.EWOULDBLOCK))
2347n/a finally:
2348n/a self.misc_event.set()
2349n/a
2350n/a
2351n/aclass SendmsgConnectionlessTests(SendmsgTests):
2352n/a # Tests for sendmsg() which require a connectionless-mode
2353n/a # (e.g. datagram) socket, and do not involve recvmsg() or
2354n/a # recvmsg_into().
2355n/a
2356n/a def testSendmsgNoDestAddr(self):
2357n/a # Check that sendmsg() fails when no destination address is
2358n/a # given for unconnected socket.
2359n/a pass
2360n/a
2361n/a def _testSendmsgNoDestAddr(self):
2362n/a self.assertRaises(OSError, self.cli_sock.sendmsg,
2363n/a [MSG])
2364n/a self.assertRaises(OSError, self.cli_sock.sendmsg,
2365n/a [MSG], [], 0, None)
2366n/a
2367n/a
2368n/aclass RecvmsgGenericTests(SendrecvmsgBase):
2369n/a # Tests for recvmsg() which can also be emulated using
2370n/a # recvmsg_into(), and can use any socket type.
2371n/a
2372n/a def testRecvmsg(self):
2373n/a # Receive a simple message with recvmsg[_into]().
2374n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2375n/a self.assertEqual(msg, MSG)
2376n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2377n/a self.assertEqual(ancdata, [])
2378n/a self.checkFlags(flags, eor=True)
2379n/a
2380n/a def _testRecvmsg(self):
2381n/a self.sendToServer(MSG)
2382n/a
2383n/a def testRecvmsgExplicitDefaults(self):
2384n/a # Test recvmsg[_into]() with default arguments provided explicitly.
2385n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2386n/a len(MSG), 0, 0)
2387n/a self.assertEqual(msg, MSG)
2388n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2389n/a self.assertEqual(ancdata, [])
2390n/a self.checkFlags(flags, eor=True)
2391n/a
2392n/a def _testRecvmsgExplicitDefaults(self):
2393n/a self.sendToServer(MSG)
2394n/a
2395n/a def testRecvmsgShorter(self):
2396n/a # Receive a message smaller than buffer.
2397n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2398n/a len(MSG) + 42)
2399n/a self.assertEqual(msg, MSG)
2400n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2401n/a self.assertEqual(ancdata, [])
2402n/a self.checkFlags(flags, eor=True)
2403n/a
2404n/a def _testRecvmsgShorter(self):
2405n/a self.sendToServer(MSG)
2406n/a
2407n/a # FreeBSD < 8 doesn't always set the MSG_TRUNC flag when a truncated
2408n/a # datagram is received (issue #13001).
2409n/a @support.requires_freebsd_version(8)
2410n/a def testRecvmsgTrunc(self):
2411n/a # Receive part of message, check for truncation indicators.
2412n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2413n/a len(MSG) - 3)
2414n/a self.assertEqual(msg, MSG[:-3])
2415n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2416n/a self.assertEqual(ancdata, [])
2417n/a self.checkFlags(flags, eor=False)
2418n/a
2419n/a @support.requires_freebsd_version(8)
2420n/a def _testRecvmsgTrunc(self):
2421n/a self.sendToServer(MSG)
2422n/a
2423n/a def testRecvmsgShortAncillaryBuf(self):
2424n/a # Test ancillary data buffer too small to hold any ancillary data.
2425n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2426n/a len(MSG), 1)
2427n/a self.assertEqual(msg, MSG)
2428n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2429n/a self.assertEqual(ancdata, [])
2430n/a self.checkFlags(flags, eor=True)
2431n/a
2432n/a def _testRecvmsgShortAncillaryBuf(self):
2433n/a self.sendToServer(MSG)
2434n/a
2435n/a def testRecvmsgLongAncillaryBuf(self):
2436n/a # Test large ancillary data buffer.
2437n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2438n/a len(MSG), 10240)
2439n/a self.assertEqual(msg, MSG)
2440n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2441n/a self.assertEqual(ancdata, [])
2442n/a self.checkFlags(flags, eor=True)
2443n/a
2444n/a def _testRecvmsgLongAncillaryBuf(self):
2445n/a self.sendToServer(MSG)
2446n/a
2447n/a def testRecvmsgAfterClose(self):
2448n/a # Check that recvmsg[_into]() fails on a closed socket.
2449n/a self.serv_sock.close()
2450n/a self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
2451n/a
2452n/a def _testRecvmsgAfterClose(self):
2453n/a pass
2454n/a
2455n/a def testRecvmsgTimeout(self):
2456n/a # Check that timeout works.
2457n/a try:
2458n/a self.serv_sock.settimeout(0.03)
2459n/a self.assertRaises(socket.timeout,
2460n/a self.doRecvmsg, self.serv_sock, len(MSG))
2461n/a finally:
2462n/a self.misc_event.set()
2463n/a
2464n/a def _testRecvmsgTimeout(self):
2465n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
2466n/a
2467n/a @requireAttrs(socket, "MSG_PEEK")
2468n/a def testRecvmsgPeek(self):
2469n/a # Check that MSG_PEEK in flags enables examination of pending
2470n/a # data without consuming it.
2471n/a
2472n/a # Receive part of data with MSG_PEEK.
2473n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2474n/a len(MSG) - 3, 0,
2475n/a socket.MSG_PEEK)
2476n/a self.assertEqual(msg, MSG[:-3])
2477n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2478n/a self.assertEqual(ancdata, [])
2479n/a # Ignoring MSG_TRUNC here (so this test is the same for stream
2480n/a # and datagram sockets). Some wording in POSIX seems to
2481n/a # suggest that it needn't be set when peeking, but that may
2482n/a # just be a slip.
2483n/a self.checkFlags(flags, eor=False,
2484n/a ignore=getattr(socket, "MSG_TRUNC", 0))
2485n/a
2486n/a # Receive all data with MSG_PEEK.
2487n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2488n/a len(MSG), 0,
2489n/a socket.MSG_PEEK)
2490n/a self.assertEqual(msg, MSG)
2491n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2492n/a self.assertEqual(ancdata, [])
2493n/a self.checkFlags(flags, eor=True)
2494n/a
2495n/a # Check that the same data can still be received normally.
2496n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2497n/a self.assertEqual(msg, MSG)
2498n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2499n/a self.assertEqual(ancdata, [])
2500n/a self.checkFlags(flags, eor=True)
2501n/a
2502n/a @testRecvmsgPeek.client_skip
2503n/a def _testRecvmsgPeek(self):
2504n/a self.sendToServer(MSG)
2505n/a
2506n/a @requireAttrs(socket.socket, "sendmsg")
2507n/a def testRecvmsgFromSendmsg(self):
2508n/a # Test receiving with recvmsg[_into]() when message is sent
2509n/a # using sendmsg().
2510n/a self.serv_sock.settimeout(self.fail_timeout)
2511n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
2512n/a self.assertEqual(msg, MSG)
2513n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2514n/a self.assertEqual(ancdata, [])
2515n/a self.checkFlags(flags, eor=True)
2516n/a
2517n/a @testRecvmsgFromSendmsg.client_skip
2518n/a def _testRecvmsgFromSendmsg(self):
2519n/a self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2520n/a
2521n/a
2522n/aclass RecvmsgGenericStreamTests(RecvmsgGenericTests):
2523n/a # Tests which require a stream socket and can use either recvmsg()
2524n/a # or recvmsg_into().
2525n/a
2526n/a def testRecvmsgEOF(self):
2527n/a # Receive end-of-stream indicator (b"", peer socket closed).
2528n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2529n/a self.assertEqual(msg, b"")
2530n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2531n/a self.assertEqual(ancdata, [])
2532n/a self.checkFlags(flags, eor=None) # Might not have end-of-record marker
2533n/a
2534n/a def _testRecvmsgEOF(self):
2535n/a self.cli_sock.close()
2536n/a
2537n/a def testRecvmsgOverflow(self):
2538n/a # Receive a message in more than one chunk.
2539n/a seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2540n/a len(MSG) - 3)
2541n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2542n/a self.assertEqual(ancdata, [])
2543n/a self.checkFlags(flags, eor=False)
2544n/a
2545n/a seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
2546n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2547n/a self.assertEqual(ancdata, [])
2548n/a self.checkFlags(flags, eor=True)
2549n/a
2550n/a msg = seg1 + seg2
2551n/a self.assertEqual(msg, MSG)
2552n/a
2553n/a def _testRecvmsgOverflow(self):
2554n/a self.sendToServer(MSG)
2555n/a
2556n/a
2557n/aclass RecvmsgTests(RecvmsgGenericTests):
2558n/a # Tests for recvmsg() which can use any socket type.
2559n/a
2560n/a def testRecvmsgBadArgs(self):
2561n/a # Check that recvmsg() rejects invalid arguments.
2562n/a self.assertRaises(TypeError, self.serv_sock.recvmsg)
2563n/a self.assertRaises(ValueError, self.serv_sock.recvmsg,
2564n/a -1, 0, 0)
2565n/a self.assertRaises(ValueError, self.serv_sock.recvmsg,
2566n/a len(MSG), -1, 0)
2567n/a self.assertRaises(TypeError, self.serv_sock.recvmsg,
2568n/a [bytearray(10)], 0, 0)
2569n/a self.assertRaises(TypeError, self.serv_sock.recvmsg,
2570n/a object(), 0, 0)
2571n/a self.assertRaises(TypeError, self.serv_sock.recvmsg,
2572n/a len(MSG), object(), 0)
2573n/a self.assertRaises(TypeError, self.serv_sock.recvmsg,
2574n/a len(MSG), 0, object())
2575n/a
2576n/a msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
2577n/a self.assertEqual(msg, MSG)
2578n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2579n/a self.assertEqual(ancdata, [])
2580n/a self.checkFlags(flags, eor=True)
2581n/a
2582n/a def _testRecvmsgBadArgs(self):
2583n/a self.sendToServer(MSG)
2584n/a
2585n/a
2586n/aclass RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
2587n/a # Tests for recvmsg_into() which can use any socket type.
2588n/a
2589n/a def testRecvmsgIntoBadArgs(self):
2590n/a # Check that recvmsg_into() rejects invalid arguments.
2591n/a buf = bytearray(len(MSG))
2592n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
2593n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2594n/a len(MSG), 0, 0)
2595n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2596n/a buf, 0, 0)
2597n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2598n/a [object()], 0, 0)
2599n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2600n/a [b"I'm not writable"], 0, 0)
2601n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2602n/a [buf, object()], 0, 0)
2603n/a self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
2604n/a [buf], -1, 0)
2605n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2606n/a [buf], object(), 0)
2607n/a self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
2608n/a [buf], 0, object())
2609n/a
2610n/a nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
2611n/a self.assertEqual(nbytes, len(MSG))
2612n/a self.assertEqual(buf, bytearray(MSG))
2613n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2614n/a self.assertEqual(ancdata, [])
2615n/a self.checkFlags(flags, eor=True)
2616n/a
2617n/a def _testRecvmsgIntoBadArgs(self):
2618n/a self.sendToServer(MSG)
2619n/a
2620n/a def testRecvmsgIntoGenerator(self):
2621n/a # Receive into buffer obtained from a generator (not a sequence).
2622n/a buf = bytearray(len(MSG))
2623n/a nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
2624n/a (o for o in [buf]))
2625n/a self.assertEqual(nbytes, len(MSG))
2626n/a self.assertEqual(buf, bytearray(MSG))
2627n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2628n/a self.assertEqual(ancdata, [])
2629n/a self.checkFlags(flags, eor=True)
2630n/a
2631n/a def _testRecvmsgIntoGenerator(self):
2632n/a self.sendToServer(MSG)
2633n/a
2634n/a def testRecvmsgIntoArray(self):
2635n/a # Receive into an array rather than the usual bytearray.
2636n/a buf = array.array("B", [0] * len(MSG))
2637n/a nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
2638n/a self.assertEqual(nbytes, len(MSG))
2639n/a self.assertEqual(buf.tobytes(), MSG)
2640n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2641n/a self.assertEqual(ancdata, [])
2642n/a self.checkFlags(flags, eor=True)
2643n/a
2644n/a def _testRecvmsgIntoArray(self):
2645n/a self.sendToServer(MSG)
2646n/a
2647n/a def testRecvmsgIntoScatter(self):
2648n/a # Receive into multiple buffers (scatter write).
2649n/a b1 = bytearray(b"----")
2650n/a b2 = bytearray(b"0123456789")
2651n/a b3 = bytearray(b"--------------")
2652n/a nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
2653n/a [b1, memoryview(b2)[2:9], b3])
2654n/a self.assertEqual(nbytes, len(b"Mary had a little lamb"))
2655n/a self.assertEqual(b1, bytearray(b"Mary"))
2656n/a self.assertEqual(b2, bytearray(b"01 had a 9"))
2657n/a self.assertEqual(b3, bytearray(b"little lamb---"))
2658n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2659n/a self.assertEqual(ancdata, [])
2660n/a self.checkFlags(flags, eor=True)
2661n/a
2662n/a def _testRecvmsgIntoScatter(self):
2663n/a self.sendToServer(b"Mary had a little lamb")
2664n/a
2665n/a
2666n/aclass CmsgMacroTests(unittest.TestCase):
2667n/a # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests
2668n/a # assumptions used by sendmsg() and recvmsg[_into](), which share
2669n/a # code with these functions.
2670n/a
2671n/a # Match the definition in socketmodule.c
2672n/a try:
2673n/a import _testcapi
2674n/a except ImportError:
2675n/a socklen_t_limit = 0x7fffffff
2676n/a else:
2677n/a socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
2678n/a
2679n/a @requireAttrs(socket, "CMSG_LEN")
2680n/a def testCMSG_LEN(self):
2681n/a # Test CMSG_LEN() with various valid and invalid values,
2682n/a # checking the assumptions used by recvmsg() and sendmsg().
2683n/a toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
2684n/a values = list(range(257)) + list(range(toobig - 257, toobig))
2685n/a
2686n/a # struct cmsghdr has at least three members, two of which are ints
2687n/a self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
2688n/a for n in values:
2689n/a ret = socket.CMSG_LEN(n)
2690n/a # This is how recvmsg() calculates the data size
2691n/a self.assertEqual(ret - socket.CMSG_LEN(0), n)
2692n/a self.assertLessEqual(ret, self.socklen_t_limit)
2693n/a
2694n/a self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
2695n/a # sendmsg() shares code with these functions, and requires
2696n/a # that it reject values over the limit.
2697n/a self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
2698n/a self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
2699n/a
2700n/a @requireAttrs(socket, "CMSG_SPACE")
2701n/a def testCMSG_SPACE(self):
2702n/a # Test CMSG_SPACE() with various valid and invalid values,
2703n/a # checking the assumptions used by sendmsg().
2704n/a toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
2705n/a values = list(range(257)) + list(range(toobig - 257, toobig))
2706n/a
2707n/a last = socket.CMSG_SPACE(0)
2708n/a # struct cmsghdr has at least three members, two of which are ints
2709n/a self.assertGreater(last, array.array("i").itemsize * 2)
2710n/a for n in values:
2711n/a ret = socket.CMSG_SPACE(n)
2712n/a self.assertGreaterEqual(ret, last)
2713n/a self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
2714n/a self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
2715n/a self.assertLessEqual(ret, self.socklen_t_limit)
2716n/a last = ret
2717n/a
2718n/a self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
2719n/a # sendmsg() shares code with these functions, and requires
2720n/a # that it reject values over the limit.
2721n/a self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
2722n/a self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
2723n/a
2724n/a
2725n/aclass SCMRightsTest(SendrecvmsgServerTimeoutBase):
2726n/a # Tests for file descriptor passing on Unix-domain sockets.
2727n/a
2728n/a # Invalid file descriptor value that's unlikely to evaluate to a
2729n/a # real FD even if one of its bytes is replaced with a different
2730n/a # value (which shouldn't actually happen).
2731n/a badfd = -0x5555
2732n/a
2733n/a def newFDs(self, n):
2734n/a # Return a list of n file descriptors for newly-created files
2735n/a # containing their list indices as ASCII numbers.
2736n/a fds = []
2737n/a for i in range(n):
2738n/a fd, path = tempfile.mkstemp()
2739n/a self.addCleanup(os.unlink, path)
2740n/a self.addCleanup(os.close, fd)
2741n/a os.write(fd, str(i).encode())
2742n/a fds.append(fd)
2743n/a return fds
2744n/a
2745n/a def checkFDs(self, fds):
2746n/a # Check that the file descriptors in the given list contain
2747n/a # their correct list indices as ASCII numbers.
2748n/a for n, fd in enumerate(fds):
2749n/a os.lseek(fd, 0, os.SEEK_SET)
2750n/a self.assertEqual(os.read(fd, 1024), str(n).encode())
2751n/a
2752n/a def registerRecvmsgResult(self, result):
2753n/a self.addCleanup(self.closeRecvmsgFDs, result)
2754n/a
2755n/a def closeRecvmsgFDs(self, recvmsg_result):
2756n/a # Close all file descriptors specified in the ancillary data
2757n/a # of the given return value from recvmsg() or recvmsg_into().
2758n/a for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
2759n/a if (cmsg_level == socket.SOL_SOCKET and
2760n/a cmsg_type == socket.SCM_RIGHTS):
2761n/a fds = array.array("i")
2762n/a fds.frombytes(cmsg_data[:
2763n/a len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
2764n/a for fd in fds:
2765n/a os.close(fd)
2766n/a
2767n/a def createAndSendFDs(self, n):
2768n/a # Send n new file descriptors created by newFDs() to the
2769n/a # server, with the constant MSG as the non-ancillary data.
2770n/a self.assertEqual(
2771n/a self.sendmsgToServer([MSG],
2772n/a [(socket.SOL_SOCKET,
2773n/a socket.SCM_RIGHTS,
2774n/a array.array("i", self.newFDs(n)))]),
2775n/a len(MSG))
2776n/a
2777n/a def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
2778n/a # Check that constant MSG was received with numfds file
2779n/a # descriptors in a maximum of maxcmsgs control messages (which
2780n/a # must contain only complete integers). By default, check
2781n/a # that MSG_CTRUNC is unset, but ignore any flags in
2782n/a # ignoreflags.
2783n/a msg, ancdata, flags, addr = result
2784n/a self.assertEqual(msg, MSG)
2785n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2786n/a self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
2787n/a ignore=ignoreflags)
2788n/a
2789n/a self.assertIsInstance(ancdata, list)
2790n/a self.assertLessEqual(len(ancdata), maxcmsgs)
2791n/a fds = array.array("i")
2792n/a for item in ancdata:
2793n/a self.assertIsInstance(item, tuple)
2794n/a cmsg_level, cmsg_type, cmsg_data = item
2795n/a self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2796n/a self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2797n/a self.assertIsInstance(cmsg_data, bytes)
2798n/a self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
2799n/a fds.frombytes(cmsg_data)
2800n/a
2801n/a self.assertEqual(len(fds), numfds)
2802n/a self.checkFDs(fds)
2803n/a
2804n/a def testFDPassSimple(self):
2805n/a # Pass a single FD (array read from bytes object).
2806n/a self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
2807n/a len(MSG), 10240))
2808n/a
2809n/a def _testFDPassSimple(self):
2810n/a self.assertEqual(
2811n/a self.sendmsgToServer(
2812n/a [MSG],
2813n/a [(socket.SOL_SOCKET,
2814n/a socket.SCM_RIGHTS,
2815n/a array.array("i", self.newFDs(1)).tobytes())]),
2816n/a len(MSG))
2817n/a
2818n/a def testMultipleFDPass(self):
2819n/a # Pass multiple FDs in a single array.
2820n/a self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
2821n/a len(MSG), 10240))
2822n/a
2823n/a def _testMultipleFDPass(self):
2824n/a self.createAndSendFDs(4)
2825n/a
2826n/a @requireAttrs(socket, "CMSG_SPACE")
2827n/a def testFDPassCMSG_SPACE(self):
2828n/a # Test using CMSG_SPACE() to calculate ancillary buffer size.
2829n/a self.checkRecvmsgFDs(
2830n/a 4, self.doRecvmsg(self.serv_sock, len(MSG),
2831n/a socket.CMSG_SPACE(4 * SIZEOF_INT)))
2832n/a
2833n/a @testFDPassCMSG_SPACE.client_skip
2834n/a def _testFDPassCMSG_SPACE(self):
2835n/a self.createAndSendFDs(4)
2836n/a
2837n/a def testFDPassCMSG_LEN(self):
2838n/a # Test using CMSG_LEN() to calculate ancillary buffer size.
2839n/a self.checkRecvmsgFDs(1,
2840n/a self.doRecvmsg(self.serv_sock, len(MSG),
2841n/a socket.CMSG_LEN(4 * SIZEOF_INT)),
2842n/a # RFC 3542 says implementations may set
2843n/a # MSG_CTRUNC if there isn't enough space
2844n/a # for trailing padding.
2845n/a ignoreflags=socket.MSG_CTRUNC)
2846n/a
2847n/a def _testFDPassCMSG_LEN(self):
2848n/a self.createAndSendFDs(1)
2849n/a
2850n/a @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2851n/a @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2852n/a @requireAttrs(socket, "CMSG_SPACE")
2853n/a def testFDPassSeparate(self):
2854n/a # Pass two FDs in two separate arrays. Arrays may be combined
2855n/a # into a single control message by the OS.
2856n/a self.checkRecvmsgFDs(2,
2857n/a self.doRecvmsg(self.serv_sock, len(MSG), 10240),
2858n/a maxcmsgs=2)
2859n/a
2860n/a @testFDPassSeparate.client_skip
2861n/a @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2862n/a @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2863n/a def _testFDPassSeparate(self):
2864n/a fd0, fd1 = self.newFDs(2)
2865n/a self.assertEqual(
2866n/a self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
2867n/a socket.SCM_RIGHTS,
2868n/a array.array("i", [fd0])),
2869n/a (socket.SOL_SOCKET,
2870n/a socket.SCM_RIGHTS,
2871n/a array.array("i", [fd1]))]),
2872n/a len(MSG))
2873n/a
2874n/a @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2875n/a @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2876n/a @requireAttrs(socket, "CMSG_SPACE")
2877n/a def testFDPassSeparateMinSpace(self):
2878n/a # Pass two FDs in two separate arrays, receiving them into the
2879n/a # minimum space for two arrays.
2880n/a self.checkRecvmsgFDs(2,
2881n/a self.doRecvmsg(self.serv_sock, len(MSG),
2882n/a socket.CMSG_SPACE(SIZEOF_INT) +
2883n/a socket.CMSG_LEN(SIZEOF_INT)),
2884n/a maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
2885n/a
2886n/a @testFDPassSeparateMinSpace.client_skip
2887n/a @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
2888n/a @unittest.skipIf(sys.platform.startswith("aix"), "skipping, see issue #22397")
2889n/a def _testFDPassSeparateMinSpace(self):
2890n/a fd0, fd1 = self.newFDs(2)
2891n/a self.assertEqual(
2892n/a self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
2893n/a socket.SCM_RIGHTS,
2894n/a array.array("i", [fd0])),
2895n/a (socket.SOL_SOCKET,
2896n/a socket.SCM_RIGHTS,
2897n/a array.array("i", [fd1]))]),
2898n/a len(MSG))
2899n/a
2900n/a def sendAncillaryIfPossible(self, msg, ancdata):
2901n/a # Try to send msg and ancdata to server, but if the system
2902n/a # call fails, just send msg with no ancillary data.
2903n/a try:
2904n/a nbytes = self.sendmsgToServer([msg], ancdata)
2905n/a except OSError as e:
2906n/a # Check that it was the system call that failed
2907n/a self.assertIsInstance(e.errno, int)
2908n/a nbytes = self.sendmsgToServer([msg])
2909n/a self.assertEqual(nbytes, len(msg))
2910n/a
2911n/a @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
2912n/a def testFDPassEmpty(self):
2913n/a # Try to pass an empty FD array. Can receive either no array
2914n/a # or an empty array.
2915n/a self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
2916n/a len(MSG), 10240),
2917n/a ignoreflags=socket.MSG_CTRUNC)
2918n/a
2919n/a def _testFDPassEmpty(self):
2920n/a self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
2921n/a socket.SCM_RIGHTS,
2922n/a b"")])
2923n/a
2924n/a def testFDPassPartialInt(self):
2925n/a # Try to pass a truncated FD array.
2926n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2927n/a len(MSG), 10240)
2928n/a self.assertEqual(msg, MSG)
2929n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2930n/a self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
2931n/a self.assertLessEqual(len(ancdata), 1)
2932n/a for cmsg_level, cmsg_type, cmsg_data in ancdata:
2933n/a self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2934n/a self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2935n/a self.assertLess(len(cmsg_data), SIZEOF_INT)
2936n/a
2937n/a def _testFDPassPartialInt(self):
2938n/a self.sendAncillaryIfPossible(
2939n/a MSG,
2940n/a [(socket.SOL_SOCKET,
2941n/a socket.SCM_RIGHTS,
2942n/a array.array("i", [self.badfd]).tobytes()[:-1])])
2943n/a
2944n/a @requireAttrs(socket, "CMSG_SPACE")
2945n/a def testFDPassPartialIntInMiddle(self):
2946n/a # Try to pass two FD arrays, the first of which is truncated.
2947n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
2948n/a len(MSG), 10240)
2949n/a self.assertEqual(msg, MSG)
2950n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2951n/a self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
2952n/a self.assertLessEqual(len(ancdata), 2)
2953n/a fds = array.array("i")
2954n/a # Arrays may have been combined in a single control message
2955n/a for cmsg_level, cmsg_type, cmsg_data in ancdata:
2956n/a self.assertEqual(cmsg_level, socket.SOL_SOCKET)
2957n/a self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
2958n/a fds.frombytes(cmsg_data[:
2959n/a len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
2960n/a self.assertLessEqual(len(fds), 2)
2961n/a self.checkFDs(fds)
2962n/a
2963n/a @testFDPassPartialIntInMiddle.client_skip
2964n/a def _testFDPassPartialIntInMiddle(self):
2965n/a fd0, fd1 = self.newFDs(2)
2966n/a self.sendAncillaryIfPossible(
2967n/a MSG,
2968n/a [(socket.SOL_SOCKET,
2969n/a socket.SCM_RIGHTS,
2970n/a array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
2971n/a (socket.SOL_SOCKET,
2972n/a socket.SCM_RIGHTS,
2973n/a array.array("i", [fd1]))])
2974n/a
2975n/a def checkTruncatedHeader(self, result, ignoreflags=0):
2976n/a # Check that no ancillary data items are returned when data is
2977n/a # truncated inside the cmsghdr structure.
2978n/a msg, ancdata, flags, addr = result
2979n/a self.assertEqual(msg, MSG)
2980n/a self.checkRecvmsgAddress(addr, self.cli_addr)
2981n/a self.assertEqual(ancdata, [])
2982n/a self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
2983n/a ignore=ignoreflags)
2984n/a
2985n/a def testCmsgTruncNoBufSize(self):
2986n/a # Check that no ancillary data is received when no buffer size
2987n/a # is specified.
2988n/a self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
2989n/a # BSD seems to set MSG_CTRUNC only
2990n/a # if an item has been partially
2991n/a # received.
2992n/a ignoreflags=socket.MSG_CTRUNC)
2993n/a
2994n/a def _testCmsgTruncNoBufSize(self):
2995n/a self.createAndSendFDs(1)
2996n/a
2997n/a def testCmsgTrunc0(self):
2998n/a # Check that no ancillary data is received when buffer size is 0.
2999n/a self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
3000n/a ignoreflags=socket.MSG_CTRUNC)
3001n/a
3002n/a def _testCmsgTrunc0(self):
3003n/a self.createAndSendFDs(1)
3004n/a
3005n/a # Check that no ancillary data is returned for various non-zero
3006n/a # (but still too small) buffer sizes.
3007n/a
3008n/a def testCmsgTrunc1(self):
3009n/a self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
3010n/a
3011n/a def _testCmsgTrunc1(self):
3012n/a self.createAndSendFDs(1)
3013n/a
3014n/a def testCmsgTrunc2Int(self):
3015n/a # The cmsghdr structure has at least three members, two of
3016n/a # which are ints, so we still shouldn't see any ancillary
3017n/a # data.
3018n/a self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3019n/a SIZEOF_INT * 2))
3020n/a
3021n/a def _testCmsgTrunc2Int(self):
3022n/a self.createAndSendFDs(1)
3023n/a
3024n/a def testCmsgTruncLen0Minus1(self):
3025n/a self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3026n/a socket.CMSG_LEN(0) - 1))
3027n/a
3028n/a def _testCmsgTruncLen0Minus1(self):
3029n/a self.createAndSendFDs(1)
3030n/a
3031n/a # The following tests try to truncate the control message in the
3032n/a # middle of the FD array.
3033n/a
3034n/a def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3035n/a # Check that file descriptor data is truncated to between
3036n/a # mindata and maxdata bytes when received with buffer size
3037n/a # ancbuf, and that any complete file descriptor numbers are
3038n/a # valid.
3039n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3040n/a len(MSG), ancbuf)
3041n/a self.assertEqual(msg, MSG)
3042n/a self.checkRecvmsgAddress(addr, self.cli_addr)
3043n/a self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3044n/a
3045n/a if mindata == 0 and ancdata == []:
3046n/a return
3047n/a self.assertEqual(len(ancdata), 1)
3048n/a cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3049n/a self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3050n/a self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3051n/a self.assertGreaterEqual(len(cmsg_data), mindata)
3052n/a self.assertLessEqual(len(cmsg_data), maxdata)
3053n/a fds = array.array("i")
3054n/a fds.frombytes(cmsg_data[:
3055n/a len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3056n/a self.checkFDs(fds)
3057n/a
3058n/a def testCmsgTruncLen0(self):
3059n/a self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3060n/a
3061n/a def _testCmsgTruncLen0(self):
3062n/a self.createAndSendFDs(1)
3063n/a
3064n/a def testCmsgTruncLen0Plus1(self):
3065n/a self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3066n/a
3067n/a def _testCmsgTruncLen0Plus1(self):
3068n/a self.createAndSendFDs(2)
3069n/a
3070n/a def testCmsgTruncLen1(self):
3071n/a self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3072n/a maxdata=SIZEOF_INT)
3073n/a
3074n/a def _testCmsgTruncLen1(self):
3075n/a self.createAndSendFDs(2)
3076n/a
3077n/a def testCmsgTruncLen2Minus1(self):
3078n/a self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3079n/a maxdata=(2 * SIZEOF_INT) - 1)
3080n/a
3081n/a def _testCmsgTruncLen2Minus1(self):
3082n/a self.createAndSendFDs(2)
3083n/a
3084n/a
3085n/aclass RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3086n/a # Test sendmsg() and recvmsg[_into]() using the ancillary data
3087n/a # features of the RFC 3542 Advanced Sockets API for IPv6.
3088n/a # Currently we can only handle certain data items (e.g. traffic
3089n/a # class, hop limit, MTU discovery and fragmentation settings)
3090n/a # without resorting to unportable means such as the struct module,
3091n/a # but the tests here are aimed at testing the ancillary data
3092n/a # handling in sendmsg() and recvmsg() rather than the IPv6 API
3093n/a # itself.
3094n/a
3095n/a # Test value to use when setting hop limit of packet
3096n/a hop_limit = 2
3097n/a
3098n/a # Test value to use when setting traffic class of packet.
3099n/a # -1 means "use kernel default".
3100n/a traffic_class = -1
3101n/a
3102n/a def ancillaryMapping(self, ancdata):
3103n/a # Given ancillary data list ancdata, return a mapping from
3104n/a # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3105n/a # Check that no (level, type) pair appears more than once.
3106n/a d = {}
3107n/a for cmsg_level, cmsg_type, cmsg_data in ancdata:
3108n/a self.assertNotIn((cmsg_level, cmsg_type), d)
3109n/a d[(cmsg_level, cmsg_type)] = cmsg_data
3110n/a return d
3111n/a
3112n/a def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3113n/a # Receive hop limit into ancbufsize bytes of ancillary data
3114n/a # space. Check that data is MSG, ancillary data is not
3115n/a # truncated (but ignore any flags in ignoreflags), and hop
3116n/a # limit is between 0 and maxhop inclusive.
3117n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3118n/a socket.IPV6_RECVHOPLIMIT, 1)
3119n/a self.misc_event.set()
3120n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3121n/a len(MSG), ancbufsize)
3122n/a
3123n/a self.assertEqual(msg, MSG)
3124n/a self.checkRecvmsgAddress(addr, self.cli_addr)
3125n/a self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3126n/a ignore=ignoreflags)
3127n/a
3128n/a self.assertEqual(len(ancdata), 1)
3129n/a self.assertIsInstance(ancdata[0], tuple)
3130n/a cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3131n/a self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3132n/a self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3133n/a self.assertIsInstance(cmsg_data, bytes)
3134n/a self.assertEqual(len(cmsg_data), SIZEOF_INT)
3135n/a a = array.array("i")
3136n/a a.frombytes(cmsg_data)
3137n/a self.assertGreaterEqual(a[0], 0)
3138n/a self.assertLessEqual(a[0], maxhop)
3139n/a
3140n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3141n/a def testRecvHopLimit(self):
3142n/a # Test receiving the packet hop limit as ancillary data.
3143n/a self.checkHopLimit(ancbufsize=10240)
3144n/a
3145n/a @testRecvHopLimit.client_skip
3146n/a def _testRecvHopLimit(self):
3147n/a # Need to wait until server has asked to receive ancillary
3148n/a # data, as implementations are not required to buffer it
3149n/a # otherwise.
3150n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3151n/a self.sendToServer(MSG)
3152n/a
3153n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3154n/a def testRecvHopLimitCMSG_SPACE(self):
3155n/a # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3156n/a self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3157n/a
3158n/a @testRecvHopLimitCMSG_SPACE.client_skip
3159n/a def _testRecvHopLimitCMSG_SPACE(self):
3160n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3161n/a self.sendToServer(MSG)
3162n/a
3163n/a # Could test receiving into buffer sized using CMSG_LEN, but RFC
3164n/a # 3542 says portable applications must provide space for trailing
3165n/a # padding. Implementations may set MSG_CTRUNC if there isn't
3166n/a # enough space for the padding.
3167n/a
3168n/a @requireAttrs(socket.socket, "sendmsg")
3169n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3170n/a def testSetHopLimit(self):
3171n/a # Test setting hop limit on outgoing packet and receiving it
3172n/a # at the other end.
3173n/a self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3174n/a
3175n/a @testSetHopLimit.client_skip
3176n/a def _testSetHopLimit(self):
3177n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3178n/a self.assertEqual(
3179n/a self.sendmsgToServer([MSG],
3180n/a [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3181n/a array.array("i", [self.hop_limit]))]),
3182n/a len(MSG))
3183n/a
3184n/a def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3185n/a ignoreflags=0):
3186n/a # Receive traffic class and hop limit into ancbufsize bytes of
3187n/a # ancillary data space. Check that data is MSG, ancillary
3188n/a # data is not truncated (but ignore any flags in ignoreflags),
3189n/a # and traffic class and hop limit are in range (hop limit no
3190n/a # more than maxhop).
3191n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3192n/a socket.IPV6_RECVHOPLIMIT, 1)
3193n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3194n/a socket.IPV6_RECVTCLASS, 1)
3195n/a self.misc_event.set()
3196n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3197n/a len(MSG), ancbufsize)
3198n/a
3199n/a self.assertEqual(msg, MSG)
3200n/a self.checkRecvmsgAddress(addr, self.cli_addr)
3201n/a self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3202n/a ignore=ignoreflags)
3203n/a self.assertEqual(len(ancdata), 2)
3204n/a ancmap = self.ancillaryMapping(ancdata)
3205n/a
3206n/a tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3207n/a self.assertEqual(len(tcdata), SIZEOF_INT)
3208n/a a = array.array("i")
3209n/a a.frombytes(tcdata)
3210n/a self.assertGreaterEqual(a[0], 0)
3211n/a self.assertLessEqual(a[0], 255)
3212n/a
3213n/a hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3214n/a self.assertEqual(len(hldata), SIZEOF_INT)
3215n/a a = array.array("i")
3216n/a a.frombytes(hldata)
3217n/a self.assertGreaterEqual(a[0], 0)
3218n/a self.assertLessEqual(a[0], maxhop)
3219n/a
3220n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3221n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3222n/a def testRecvTrafficClassAndHopLimit(self):
3223n/a # Test receiving traffic class and hop limit as ancillary data.
3224n/a self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3225n/a
3226n/a @testRecvTrafficClassAndHopLimit.client_skip
3227n/a def _testRecvTrafficClassAndHopLimit(self):
3228n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3229n/a self.sendToServer(MSG)
3230n/a
3231n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3232n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3233n/a def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3234n/a # Test receiving traffic class and hop limit, using
3235n/a # CMSG_SPACE() to calculate buffer size.
3236n/a self.checkTrafficClassAndHopLimit(
3237n/a ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3238n/a
3239n/a @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3240n/a def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3241n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3242n/a self.sendToServer(MSG)
3243n/a
3244n/a @requireAttrs(socket.socket, "sendmsg")
3245n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3246n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3247n/a def testSetTrafficClassAndHopLimit(self):
3248n/a # Test setting traffic class and hop limit on outgoing packet,
3249n/a # and receiving them at the other end.
3250n/a self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3251n/a maxhop=self.hop_limit)
3252n/a
3253n/a @testSetTrafficClassAndHopLimit.client_skip
3254n/a def _testSetTrafficClassAndHopLimit(self):
3255n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3256n/a self.assertEqual(
3257n/a self.sendmsgToServer([MSG],
3258n/a [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3259n/a array.array("i", [self.traffic_class])),
3260n/a (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3261n/a array.array("i", [self.hop_limit]))]),
3262n/a len(MSG))
3263n/a
3264n/a @requireAttrs(socket.socket, "sendmsg")
3265n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3266n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3267n/a def testOddCmsgSize(self):
3268n/a # Try to send ancillary data with first item one byte too
3269n/a # long. Fall back to sending with correct size if this fails,
3270n/a # and check that second item was handled correctly.
3271n/a self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3272n/a maxhop=self.hop_limit)
3273n/a
3274n/a @testOddCmsgSize.client_skip
3275n/a def _testOddCmsgSize(self):
3276n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3277n/a try:
3278n/a nbytes = self.sendmsgToServer(
3279n/a [MSG],
3280n/a [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3281n/a array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
3282n/a (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3283n/a array.array("i", [self.hop_limit]))])
3284n/a except OSError as e:
3285n/a self.assertIsInstance(e.errno, int)
3286n/a nbytes = self.sendmsgToServer(
3287n/a [MSG],
3288n/a [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3289n/a array.array("i", [self.traffic_class])),
3290n/a (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3291n/a array.array("i", [self.hop_limit]))])
3292n/a self.assertEqual(nbytes, len(MSG))
3293n/a
3294n/a # Tests for proper handling of truncated ancillary data
3295n/a
3296n/a def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
3297n/a # Receive hop limit into ancbufsize bytes of ancillary data
3298n/a # space, which should be too small to contain the ancillary
3299n/a # data header (if ancbufsize is None, pass no second argument
3300n/a # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set
3301n/a # (unless included in ignoreflags), and no ancillary data is
3302n/a # returned.
3303n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3304n/a socket.IPV6_RECVHOPLIMIT, 1)
3305n/a self.misc_event.set()
3306n/a args = () if ancbufsize is None else (ancbufsize,)
3307n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3308n/a len(MSG), *args)
3309n/a
3310n/a self.assertEqual(msg, MSG)
3311n/a self.checkRecvmsgAddress(addr, self.cli_addr)
3312n/a self.assertEqual(ancdata, [])
3313n/a self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3314n/a ignore=ignoreflags)
3315n/a
3316n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3317n/a def testCmsgTruncNoBufSize(self):
3318n/a # Check that no ancillary data is received when no ancillary
3319n/a # buffer size is provided.
3320n/a self.checkHopLimitTruncatedHeader(ancbufsize=None,
3321n/a # BSD seems to set
3322n/a # MSG_CTRUNC only if an item
3323n/a # has been partially
3324n/a # received.
3325n/a ignoreflags=socket.MSG_CTRUNC)
3326n/a
3327n/a @testCmsgTruncNoBufSize.client_skip
3328n/a def _testCmsgTruncNoBufSize(self):
3329n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3330n/a self.sendToServer(MSG)
3331n/a
3332n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3333n/a def testSingleCmsgTrunc0(self):
3334n/a # Check that no ancillary data is received when ancillary
3335n/a # buffer size is zero.
3336n/a self.checkHopLimitTruncatedHeader(ancbufsize=0,
3337n/a ignoreflags=socket.MSG_CTRUNC)
3338n/a
3339n/a @testSingleCmsgTrunc0.client_skip
3340n/a def _testSingleCmsgTrunc0(self):
3341n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3342n/a self.sendToServer(MSG)
3343n/a
3344n/a # Check that no ancillary data is returned for various non-zero
3345n/a # (but still too small) buffer sizes.
3346n/a
3347n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3348n/a def testSingleCmsgTrunc1(self):
3349n/a self.checkHopLimitTruncatedHeader(ancbufsize=1)
3350n/a
3351n/a @testSingleCmsgTrunc1.client_skip
3352n/a def _testSingleCmsgTrunc1(self):
3353n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3354n/a self.sendToServer(MSG)
3355n/a
3356n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3357n/a def testSingleCmsgTrunc2Int(self):
3358n/a self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
3359n/a
3360n/a @testSingleCmsgTrunc2Int.client_skip
3361n/a def _testSingleCmsgTrunc2Int(self):
3362n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3363n/a self.sendToServer(MSG)
3364n/a
3365n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3366n/a def testSingleCmsgTruncLen0Minus1(self):
3367n/a self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
3368n/a
3369n/a @testSingleCmsgTruncLen0Minus1.client_skip
3370n/a def _testSingleCmsgTruncLen0Minus1(self):
3371n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3372n/a self.sendToServer(MSG)
3373n/a
3374n/a @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3375n/a def testSingleCmsgTruncInData(self):
3376n/a # Test truncation of a control message inside its associated
3377n/a # data. The message may be returned with its data truncated,
3378n/a # or not returned at all.
3379n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3380n/a socket.IPV6_RECVHOPLIMIT, 1)
3381n/a self.misc_event.set()
3382n/a msg, ancdata, flags, addr = self.doRecvmsg(
3383n/a self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
3384n/a
3385n/a self.assertEqual(msg, MSG)
3386n/a self.checkRecvmsgAddress(addr, self.cli_addr)
3387n/a self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3388n/a
3389n/a self.assertLessEqual(len(ancdata), 1)
3390n/a if ancdata:
3391n/a cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3392n/a self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3393n/a self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3394n/a self.assertLess(len(cmsg_data), SIZEOF_INT)
3395n/a
3396n/a @testSingleCmsgTruncInData.client_skip
3397n/a def _testSingleCmsgTruncInData(self):
3398n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3399n/a self.sendToServer(MSG)
3400n/a
3401n/a def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
3402n/a # Receive traffic class and hop limit into ancbufsize bytes of
3403n/a # ancillary data space, which should be large enough to
3404n/a # contain the first item, but too small to contain the header
3405n/a # of the second. Check that data is MSG, MSG_CTRUNC is set
3406n/a # (unless included in ignoreflags), and only one ancillary
3407n/a # data item is returned.
3408n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3409n/a socket.IPV6_RECVHOPLIMIT, 1)
3410n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3411n/a socket.IPV6_RECVTCLASS, 1)
3412n/a self.misc_event.set()
3413n/a msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3414n/a len(MSG), ancbufsize)
3415n/a
3416n/a self.assertEqual(msg, MSG)
3417n/a self.checkRecvmsgAddress(addr, self.cli_addr)
3418n/a self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3419n/a ignore=ignoreflags)
3420n/a
3421n/a self.assertEqual(len(ancdata), 1)
3422n/a cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3423n/a self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3424n/a self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
3425n/a self.assertEqual(len(cmsg_data), SIZEOF_INT)
3426n/a a = array.array("i")
3427n/a a.frombytes(cmsg_data)
3428n/a self.assertGreaterEqual(a[0], 0)
3429n/a self.assertLessEqual(a[0], 255)
3430n/a
3431n/a # Try the above test with various buffer sizes.
3432n/a
3433n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3434n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3435n/a def testSecondCmsgTrunc0(self):
3436n/a self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
3437n/a ignoreflags=socket.MSG_CTRUNC)
3438n/a
3439n/a @testSecondCmsgTrunc0.client_skip
3440n/a def _testSecondCmsgTrunc0(self):
3441n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3442n/a self.sendToServer(MSG)
3443n/a
3444n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3445n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3446n/a def testSecondCmsgTrunc1(self):
3447n/a self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
3448n/a
3449n/a @testSecondCmsgTrunc1.client_skip
3450n/a def _testSecondCmsgTrunc1(self):
3451n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3452n/a self.sendToServer(MSG)
3453n/a
3454n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3455n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3456n/a def testSecondCmsgTrunc2Int(self):
3457n/a self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3458n/a 2 * SIZEOF_INT)
3459n/a
3460n/a @testSecondCmsgTrunc2Int.client_skip
3461n/a def _testSecondCmsgTrunc2Int(self):
3462n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3463n/a self.sendToServer(MSG)
3464n/a
3465n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3466n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3467n/a def testSecondCmsgTruncLen0Minus1(self):
3468n/a self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
3469n/a socket.CMSG_LEN(0) - 1)
3470n/a
3471n/a @testSecondCmsgTruncLen0Minus1.client_skip
3472n/a def _testSecondCmsgTruncLen0Minus1(self):
3473n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3474n/a self.sendToServer(MSG)
3475n/a
3476n/a @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3477n/a "IPV6_RECVTCLASS", "IPV6_TCLASS")
3478n/a def testSecomdCmsgTruncInData(self):
3479n/a # Test truncation of the second of two control messages inside
3480n/a # its associated data.
3481n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3482n/a socket.IPV6_RECVHOPLIMIT, 1)
3483n/a self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3484n/a socket.IPV6_RECVTCLASS, 1)
3485n/a self.misc_event.set()
3486n/a msg, ancdata, flags, addr = self.doRecvmsg(
3487n/a self.serv_sock, len(MSG),
3488n/a socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
3489n/a
3490n/a self.assertEqual(msg, MSG)
3491n/a self.checkRecvmsgAddress(addr, self.cli_addr)
3492n/a self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3493n/a
3494n/a cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
3495n/a
3496n/a cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3497n/a self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3498n/a cmsg_types.remove(cmsg_type)
3499n/a self.assertEqual(len(cmsg_data), SIZEOF_INT)
3500n/a a = array.array("i")
3501n/a a.frombytes(cmsg_data)
3502n/a self.assertGreaterEqual(a[0], 0)
3503n/a self.assertLessEqual(a[0], 255)
3504n/a
3505n/a if ancdata:
3506n/a cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
3507n/a self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3508n/a cmsg_types.remove(cmsg_type)
3509n/a self.assertLess(len(cmsg_data), SIZEOF_INT)
3510n/a
3511n/a self.assertEqual(ancdata, [])
3512n/a
3513n/a @testSecomdCmsgTruncInData.client_skip
3514n/a def _testSecomdCmsgTruncInData(self):
3515n/a self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3516n/a self.sendToServer(MSG)
3517n/a
3518n/a
3519n/a# Derive concrete test classes for different socket types.
3520n/a
3521n/aclass SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
3522n/a SendrecvmsgConnectionlessBase,
3523n/a ThreadedSocketTestMixin, UDPTestBase):
3524n/a pass
3525n/a
3526n/a@requireAttrs(socket.socket, "sendmsg")
3527n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3528n/aclass SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
3529n/a pass
3530n/a
3531n/a@requireAttrs(socket.socket, "recvmsg")
3532n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3533n/aclass RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
3534n/a pass
3535n/a
3536n/a@requireAttrs(socket.socket, "recvmsg_into")
3537n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3538n/aclass RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
3539n/a pass
3540n/a
3541n/a
3542n/aclass SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
3543n/a SendrecvmsgConnectionlessBase,
3544n/a ThreadedSocketTestMixin, UDP6TestBase):
3545n/a
3546n/a def checkRecvmsgAddress(self, addr1, addr2):
3547n/a # Called to compare the received address with the address of
3548n/a # the peer, ignoring scope ID
3549n/a self.assertEqual(addr1[:-1], addr2[:-1])
3550n/a
3551n/a@requireAttrs(socket.socket, "sendmsg")
3552n/a@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3553n/a@requireSocket("AF_INET6", "SOCK_DGRAM")
3554n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3555n/aclass SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
3556n/a pass
3557n/a
3558n/a@requireAttrs(socket.socket, "recvmsg")
3559n/a@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3560n/a@requireSocket("AF_INET6", "SOCK_DGRAM")
3561n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3562n/aclass RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
3563n/a pass
3564n/a
3565n/a@requireAttrs(socket.socket, "recvmsg_into")
3566n/a@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3567n/a@requireSocket("AF_INET6", "SOCK_DGRAM")
3568n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3569n/aclass RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
3570n/a pass
3571n/a
3572n/a@requireAttrs(socket.socket, "recvmsg")
3573n/a@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3574n/a@requireAttrs(socket, "IPPROTO_IPV6")
3575n/a@requireSocket("AF_INET6", "SOCK_DGRAM")
3576n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3577n/aclass RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
3578n/a SendrecvmsgUDP6TestBase):
3579n/a pass
3580n/a
3581n/a@requireAttrs(socket.socket, "recvmsg_into")
3582n/a@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
3583n/a@requireAttrs(socket, "IPPROTO_IPV6")
3584n/a@requireSocket("AF_INET6", "SOCK_DGRAM")
3585n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3586n/aclass RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
3587n/a RFC3542AncillaryTest,
3588n/a SendrecvmsgUDP6TestBase):
3589n/a pass
3590n/a
3591n/a
3592n/aclass SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
3593n/a ConnectedStreamTestMixin, TCPTestBase):
3594n/a pass
3595n/a
3596n/a@requireAttrs(socket.socket, "sendmsg")
3597n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3598n/aclass SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
3599n/a pass
3600n/a
3601n/a@requireAttrs(socket.socket, "recvmsg")
3602n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3603n/aclass RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
3604n/a SendrecvmsgTCPTestBase):
3605n/a pass
3606n/a
3607n/a@requireAttrs(socket.socket, "recvmsg_into")
3608n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3609n/aclass RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3610n/a SendrecvmsgTCPTestBase):
3611n/a pass
3612n/a
3613n/a
3614n/aclass SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
3615n/a SendrecvmsgConnectedBase,
3616n/a ConnectedStreamTestMixin, SCTPStreamBase):
3617n/a pass
3618n/a
3619n/a@requireAttrs(socket.socket, "sendmsg")
3620n/a@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3621n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3622n/aclass SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
3623n/a pass
3624n/a
3625n/a@requireAttrs(socket.socket, "recvmsg")
3626n/a@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3627n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3628n/aclass RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
3629n/a SendrecvmsgSCTPStreamTestBase):
3630n/a
3631n/a def testRecvmsgEOF(self):
3632n/a try:
3633n/a super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
3634n/a except OSError as e:
3635n/a if e.errno != errno.ENOTCONN:
3636n/a raise
3637n/a self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
3638n/a
3639n/a@requireAttrs(socket.socket, "recvmsg_into")
3640n/a@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
3641n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3642n/aclass RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3643n/a SendrecvmsgSCTPStreamTestBase):
3644n/a
3645n/a def testRecvmsgEOF(self):
3646n/a try:
3647n/a super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
3648n/a except OSError as e:
3649n/a if e.errno != errno.ENOTCONN:
3650n/a raise
3651n/a self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
3652n/a
3653n/a
3654n/aclass SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
3655n/a ConnectedStreamTestMixin, UnixStreamBase):
3656n/a pass
3657n/a
3658n/a@requireAttrs(socket.socket, "sendmsg")
3659n/a@requireAttrs(socket, "AF_UNIX")
3660n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3661n/aclass SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
3662n/a pass
3663n/a
3664n/a@requireAttrs(socket.socket, "recvmsg")
3665n/a@requireAttrs(socket, "AF_UNIX")
3666n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3667n/aclass RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
3668n/a SendrecvmsgUnixStreamTestBase):
3669n/a pass
3670n/a
3671n/a@requireAttrs(socket.socket, "recvmsg_into")
3672n/a@requireAttrs(socket, "AF_UNIX")
3673n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3674n/aclass RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
3675n/a SendrecvmsgUnixStreamTestBase):
3676n/a pass
3677n/a
3678n/a@requireAttrs(socket.socket, "sendmsg", "recvmsg")
3679n/a@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
3680n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3681n/aclass RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
3682n/a pass
3683n/a
3684n/a@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
3685n/a@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
3686n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3687n/aclass RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
3688n/a SendrecvmsgUnixStreamTestBase):
3689n/a pass
3690n/a
3691n/a
3692n/a# Test interrupting the interruptible send/receive methods with a
3693n/a# signal when a timeout is set. These tests avoid having multiple
3694n/a# threads alive during the test so that the OS cannot deliver the
3695n/a# signal to the wrong one.
3696n/a
3697n/aclass InterruptedTimeoutBase(unittest.TestCase):
3698n/a # Base class for interrupted send/receive tests. Installs an
3699n/a # empty handler for SIGALRM and removes it on teardown, along with
3700n/a # any scheduled alarms.
3701n/a
3702n/a def setUp(self):
3703n/a super().setUp()
3704n/a orig_alrm_handler = signal.signal(signal.SIGALRM,
3705n/a lambda signum, frame: 1 / 0)
3706n/a self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
3707n/a self.addCleanup(self.setAlarm, 0)
3708n/a
3709n/a # Timeout for socket operations
3710n/a timeout = 4.0
3711n/a
3712n/a # Provide setAlarm() method to schedule delivery of SIGALRM after
3713n/a # given number of seconds, or cancel it if zero, and an
3714n/a # appropriate time value to use. Use setitimer() if available.
3715n/a if hasattr(signal, "setitimer"):
3716n/a alarm_time = 0.05
3717n/a
3718n/a def setAlarm(self, seconds):
3719n/a signal.setitimer(signal.ITIMER_REAL, seconds)
3720n/a else:
3721n/a # Old systems may deliver the alarm up to one second early
3722n/a alarm_time = 2
3723n/a
3724n/a def setAlarm(self, seconds):
3725n/a signal.alarm(seconds)
3726n/a
3727n/a
3728n/a# Require siginterrupt() in order to ensure that system calls are
3729n/a# interrupted by default.
3730n/a@requireAttrs(signal, "siginterrupt")
3731n/a@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
3732n/a "Don't have signal.alarm or signal.setitimer")
3733n/aclass InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
3734n/a # Test interrupting the recv*() methods with signals when a
3735n/a # timeout is set.
3736n/a
3737n/a def setUp(self):
3738n/a super().setUp()
3739n/a self.serv.settimeout(self.timeout)
3740n/a
3741n/a def checkInterruptedRecv(self, func, *args, **kwargs):
3742n/a # Check that func(*args, **kwargs) raises
3743n/a # errno of EINTR when interrupted by a signal.
3744n/a self.setAlarm(self.alarm_time)
3745n/a with self.assertRaises(ZeroDivisionError) as cm:
3746n/a func(*args, **kwargs)
3747n/a
3748n/a def testInterruptedRecvTimeout(self):
3749n/a self.checkInterruptedRecv(self.serv.recv, 1024)
3750n/a
3751n/a def testInterruptedRecvIntoTimeout(self):
3752n/a self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
3753n/a
3754n/a def testInterruptedRecvfromTimeout(self):
3755n/a self.checkInterruptedRecv(self.serv.recvfrom, 1024)
3756n/a
3757n/a def testInterruptedRecvfromIntoTimeout(self):
3758n/a self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
3759n/a
3760n/a @requireAttrs(socket.socket, "recvmsg")
3761n/a def testInterruptedRecvmsgTimeout(self):
3762n/a self.checkInterruptedRecv(self.serv.recvmsg, 1024)
3763n/a
3764n/a @requireAttrs(socket.socket, "recvmsg_into")
3765n/a def testInterruptedRecvmsgIntoTimeout(self):
3766n/a self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
3767n/a
3768n/a
3769n/a# Require siginterrupt() in order to ensure that system calls are
3770n/a# interrupted by default.
3771n/a@requireAttrs(signal, "siginterrupt")
3772n/a@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
3773n/a "Don't have signal.alarm or signal.setitimer")
3774n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3775n/aclass InterruptedSendTimeoutTest(InterruptedTimeoutBase,
3776n/a ThreadSafeCleanupTestCase,
3777n/a SocketListeningTestMixin, TCPTestBase):
3778n/a # Test interrupting the interruptible send*() methods with signals
3779n/a # when a timeout is set.
3780n/a
3781n/a def setUp(self):
3782n/a super().setUp()
3783n/a self.serv_conn = self.newSocket()
3784n/a self.addCleanup(self.serv_conn.close)
3785n/a # Use a thread to complete the connection, but wait for it to
3786n/a # terminate before running the test, so that there is only one
3787n/a # thread to accept the signal.
3788n/a cli_thread = threading.Thread(target=self.doConnect)
3789n/a cli_thread.start()
3790n/a self.cli_conn, addr = self.serv.accept()
3791n/a self.addCleanup(self.cli_conn.close)
3792n/a cli_thread.join()
3793n/a self.serv_conn.settimeout(self.timeout)
3794n/a
3795n/a def doConnect(self):
3796n/a self.serv_conn.connect(self.serv_addr)
3797n/a
3798n/a def checkInterruptedSend(self, func, *args, **kwargs):
3799n/a # Check that func(*args, **kwargs), run in a loop, raises
3800n/a # OSError with an errno of EINTR when interrupted by a
3801n/a # signal.
3802n/a with self.assertRaises(ZeroDivisionError) as cm:
3803n/a while True:
3804n/a self.setAlarm(self.alarm_time)
3805n/a func(*args, **kwargs)
3806n/a
3807n/a # Issue #12958: The following tests have problems on OS X prior to 10.7
3808n/a @support.requires_mac_ver(10, 7)
3809n/a def testInterruptedSendTimeout(self):
3810n/a self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
3811n/a
3812n/a @support.requires_mac_ver(10, 7)
3813n/a def testInterruptedSendtoTimeout(self):
3814n/a # Passing an actual address here as Python's wrapper for
3815n/a # sendto() doesn't allow passing a zero-length one; POSIX
3816n/a # requires that the address is ignored since the socket is
3817n/a # connection-mode, however.
3818n/a self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
3819n/a self.serv_addr)
3820n/a
3821n/a @support.requires_mac_ver(10, 7)
3822n/a @requireAttrs(socket.socket, "sendmsg")
3823n/a def testInterruptedSendmsgTimeout(self):
3824n/a self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
3825n/a
3826n/a
3827n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3828n/aclass TCPCloserTest(ThreadedTCPSocketTest):
3829n/a
3830n/a def testClose(self):
3831n/a conn, addr = self.serv.accept()
3832n/a conn.close()
3833n/a
3834n/a sd = self.cli
3835n/a read, write, err = select.select([sd], [], [], 1.0)
3836n/a self.assertEqual(read, [sd])
3837n/a self.assertEqual(sd.recv(1), b'')
3838n/a
3839n/a # Calling close() many times should be safe.
3840n/a conn.close()
3841n/a conn.close()
3842n/a
3843n/a def _testClose(self):
3844n/a self.cli.connect((HOST, self.port))
3845n/a time.sleep(1.0)
3846n/a
3847n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3848n/aclass BasicSocketPairTest(SocketPairTest):
3849n/a
3850n/a def __init__(self, methodName='runTest'):
3851n/a SocketPairTest.__init__(self, methodName=methodName)
3852n/a
3853n/a def _check_defaults(self, sock):
3854n/a self.assertIsInstance(sock, socket.socket)
3855n/a if hasattr(socket, 'AF_UNIX'):
3856n/a self.assertEqual(sock.family, socket.AF_UNIX)
3857n/a else:
3858n/a self.assertEqual(sock.family, socket.AF_INET)
3859n/a self.assertEqual(sock.type, socket.SOCK_STREAM)
3860n/a self.assertEqual(sock.proto, 0)
3861n/a
3862n/a def _testDefaults(self):
3863n/a self._check_defaults(self.cli)
3864n/a
3865n/a def testDefaults(self):
3866n/a self._check_defaults(self.serv)
3867n/a
3868n/a def testRecv(self):
3869n/a msg = self.serv.recv(1024)
3870n/a self.assertEqual(msg, MSG)
3871n/a
3872n/a def _testRecv(self):
3873n/a self.cli.send(MSG)
3874n/a
3875n/a def testSend(self):
3876n/a self.serv.send(MSG)
3877n/a
3878n/a def _testSend(self):
3879n/a msg = self.cli.recv(1024)
3880n/a self.assertEqual(msg, MSG)
3881n/a
3882n/a@unittest.skipUnless(thread, 'Threading required for this test.')
3883n/aclass NonBlockingTCPTests(ThreadedTCPSocketTest):
3884n/a
3885n/a def __init__(self, methodName='runTest'):
3886n/a ThreadedTCPSocketTest.__init__(self, methodName=methodName)
3887n/a
3888n/a def testSetBlocking(self):
3889n/a # Testing whether set blocking works
3890n/a self.serv.setblocking(True)
3891n/a self.assertIsNone(self.serv.gettimeout())
3892n/a self.serv.setblocking(False)
3893n/a self.assertEqual(self.serv.gettimeout(), 0.0)
3894n/a start = time.time()
3895n/a try:
3896n/a self.serv.accept()
3897n/a except OSError:
3898n/a pass
3899n/a end = time.time()
3900n/a self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
3901n/a
3902n/a def _testSetBlocking(self):
3903n/a pass
3904n/a
3905n/a @support.cpython_only
3906n/a def testSetBlocking_overflow(self):
3907n/a # Issue 15989
3908n/a import _testcapi
3909n/a if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
3910n/a self.skipTest('needs UINT_MAX < ULONG_MAX')
3911n/a self.serv.setblocking(False)
3912n/a self.assertEqual(self.serv.gettimeout(), 0.0)
3913n/a self.serv.setblocking(_testcapi.UINT_MAX + 1)
3914n/a self.assertIsNone(self.serv.gettimeout())
3915n/a
3916n/a _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
3917n/a
3918n/a @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
3919n/a 'test needs socket.SOCK_NONBLOCK')
3920n/a @support.requires_linux_version(2, 6, 28)
3921n/a def testInitNonBlocking(self):
3922n/a # reinit server socket
3923n/a self.serv.close()
3924n/a self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM |
3925n/a socket.SOCK_NONBLOCK)
3926n/a self.port = support.bind_port(self.serv)
3927n/a self.serv.listen()
3928n/a # actual testing
3929n/a start = time.time()
3930n/a try:
3931n/a self.serv.accept()
3932n/a except OSError:
3933n/a pass
3934n/a end = time.time()
3935n/a self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
3936n/a
3937n/a def _testInitNonBlocking(self):
3938n/a pass
3939n/a
3940n/a def testInheritFlags(self):
3941n/a # Issue #7995: when calling accept() on a listening socket with a
3942n/a # timeout, the resulting socket should not be non-blocking.
3943n/a self.serv.settimeout(10)
3944n/a try:
3945n/a conn, addr = self.serv.accept()
3946n/a message = conn.recv(len(MSG))
3947n/a finally:
3948n/a conn.close()
3949n/a self.serv.settimeout(None)
3950n/a
3951n/a def _testInheritFlags(self):
3952n/a time.sleep(0.1)
3953n/a self.cli.connect((HOST, self.port))
3954n/a time.sleep(0.5)
3955n/a self.cli.send(MSG)
3956n/a
3957n/a def testAccept(self):
3958n/a # Testing non-blocking accept
3959n/a self.serv.setblocking(0)
3960n/a try:
3961n/a conn, addr = self.serv.accept()
3962n/a except OSError:
3963n/a pass
3964n/a else:
3965n/a self.fail("Error trying to do non-blocking accept.")
3966n/a read, write, err = select.select([self.serv], [], [])
3967n/a if self.serv in read:
3968n/a conn, addr = self.serv.accept()
3969n/a self.assertIsNone(conn.gettimeout())
3970n/a conn.close()
3971n/a else:
3972n/a self.fail("Error trying to do accept after select.")
3973n/a
3974n/a def _testAccept(self):
3975n/a time.sleep(0.1)
3976n/a self.cli.connect((HOST, self.port))
3977n/a
3978n/a def testConnect(self):
3979n/a # Testing non-blocking connect
3980n/a conn, addr = self.serv.accept()
3981n/a conn.close()
3982n/a
3983n/a def _testConnect(self):
3984n/a self.cli.settimeout(10)
3985n/a self.cli.connect((HOST, self.port))
3986n/a
3987n/a def testRecv(self):
3988n/a # Testing non-blocking recv
3989n/a conn, addr = self.serv.accept()
3990n/a conn.setblocking(0)
3991n/a try:
3992n/a msg = conn.recv(len(MSG))
3993n/a except OSError:
3994n/a pass
3995n/a else:
3996n/a self.fail("Error trying to do non-blocking recv.")
3997n/a read, write, err = select.select([conn], [], [])
3998n/a if conn in read:
3999n/a msg = conn.recv(len(MSG))
4000n/a conn.close()
4001n/a self.assertEqual(msg, MSG)
4002n/a else:
4003n/a self.fail("Error during select call to non-blocking socket.")
4004n/a
4005n/a def _testRecv(self):
4006n/a self.cli.connect((HOST, self.port))
4007n/a time.sleep(0.1)
4008n/a self.cli.send(MSG)
4009n/a
4010n/a@unittest.skipUnless(thread, 'Threading required for this test.')
4011n/aclass FileObjectClassTestCase(SocketConnectedTest):
4012n/a """Unit tests for the object returned by socket.makefile()
4013n/a
4014n/a self.read_file is the io object returned by makefile() on
4015n/a the client connection. You can read from this file to
4016n/a get output from the server.
4017n/a
4018n/a self.write_file is the io object returned by makefile() on the
4019n/a server connection. You can write to this file to send output
4020n/a to the client.
4021n/a """
4022n/a
4023n/a bufsize = -1 # Use default buffer size
4024n/a encoding = 'utf-8'
4025n/a errors = 'strict'
4026n/a newline = None
4027n/a
4028n/a read_mode = 'rb'
4029n/a read_msg = MSG
4030n/a write_mode = 'wb'
4031n/a write_msg = MSG
4032n/a
4033n/a def __init__(self, methodName='runTest'):
4034n/a SocketConnectedTest.__init__(self, methodName=methodName)
4035n/a
4036n/a def setUp(self):
4037n/a self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4038n/a threading.Event() for i in range(4)]
4039n/a SocketConnectedTest.setUp(self)
4040n/a self.read_file = self.cli_conn.makefile(
4041n/a self.read_mode, self.bufsize,
4042n/a encoding = self.encoding,
4043n/a errors = self.errors,
4044n/a newline = self.newline)
4045n/a
4046n/a def tearDown(self):
4047n/a self.serv_finished.set()
4048n/a self.read_file.close()
4049n/a self.assertTrue(self.read_file.closed)
4050n/a self.read_file = None
4051n/a SocketConnectedTest.tearDown(self)
4052n/a
4053n/a def clientSetUp(self):
4054n/a SocketConnectedTest.clientSetUp(self)
4055n/a self.write_file = self.serv_conn.makefile(
4056n/a self.write_mode, self.bufsize,
4057n/a encoding = self.encoding,
4058n/a errors = self.errors,
4059n/a newline = self.newline)
4060n/a
4061n/a def clientTearDown(self):
4062n/a self.cli_finished.set()
4063n/a self.write_file.close()
4064n/a self.assertTrue(self.write_file.closed)
4065n/a self.write_file = None
4066n/a SocketConnectedTest.clientTearDown(self)
4067n/a
4068n/a def testReadAfterTimeout(self):
4069n/a # Issue #7322: A file object must disallow further reads
4070n/a # after a timeout has occurred.
4071n/a self.cli_conn.settimeout(1)
4072n/a self.read_file.read(3)
4073n/a # First read raises a timeout
4074n/a self.assertRaises(socket.timeout, self.read_file.read, 1)
4075n/a # Second read is disallowed
4076n/a with self.assertRaises(OSError) as ctx:
4077n/a self.read_file.read(1)
4078n/a self.assertIn("cannot read from timed out object", str(ctx.exception))
4079n/a
4080n/a def _testReadAfterTimeout(self):
4081n/a self.write_file.write(self.write_msg[0:3])
4082n/a self.write_file.flush()
4083n/a self.serv_finished.wait()
4084n/a
4085n/a def testSmallRead(self):
4086n/a # Performing small file read test
4087n/a first_seg = self.read_file.read(len(self.read_msg)-3)
4088n/a second_seg = self.read_file.read(3)
4089n/a msg = first_seg + second_seg
4090n/a self.assertEqual(msg, self.read_msg)
4091n/a
4092n/a def _testSmallRead(self):
4093n/a self.write_file.write(self.write_msg)
4094n/a self.write_file.flush()
4095n/a
4096n/a def testFullRead(self):
4097n/a # read until EOF
4098n/a msg = self.read_file.read()
4099n/a self.assertEqual(msg, self.read_msg)
4100n/a
4101n/a def _testFullRead(self):
4102n/a self.write_file.write(self.write_msg)
4103n/a self.write_file.close()
4104n/a
4105n/a def testUnbufferedRead(self):
4106n/a # Performing unbuffered file read test
4107n/a buf = type(self.read_msg)()
4108n/a while 1:
4109n/a char = self.read_file.read(1)
4110n/a if not char:
4111n/a break
4112n/a buf += char
4113n/a self.assertEqual(buf, self.read_msg)
4114n/a
4115n/a def _testUnbufferedRead(self):
4116n/a self.write_file.write(self.write_msg)
4117n/a self.write_file.flush()
4118n/a
4119n/a def testReadline(self):
4120n/a # Performing file readline test
4121n/a line = self.read_file.readline()
4122n/a self.assertEqual(line, self.read_msg)
4123n/a
4124n/a def _testReadline(self):
4125n/a self.write_file.write(self.write_msg)
4126n/a self.write_file.flush()
4127n/a
4128n/a def testCloseAfterMakefile(self):
4129n/a # The file returned by makefile should keep the socket open.
4130n/a self.cli_conn.close()
4131n/a # read until EOF
4132n/a msg = self.read_file.read()
4133n/a self.assertEqual(msg, self.read_msg)
4134n/a
4135n/a def _testCloseAfterMakefile(self):
4136n/a self.write_file.write(self.write_msg)
4137n/a self.write_file.flush()
4138n/a
4139n/a def testMakefileAfterMakefileClose(self):
4140n/a self.read_file.close()
4141n/a msg = self.cli_conn.recv(len(MSG))
4142n/a if isinstance(self.read_msg, str):
4143n/a msg = msg.decode()
4144n/a self.assertEqual(msg, self.read_msg)
4145n/a
4146n/a def _testMakefileAfterMakefileClose(self):
4147n/a self.write_file.write(self.write_msg)
4148n/a self.write_file.flush()
4149n/a
4150n/a def testClosedAttr(self):
4151n/a self.assertTrue(not self.read_file.closed)
4152n/a
4153n/a def _testClosedAttr(self):
4154n/a self.assertTrue(not self.write_file.closed)
4155n/a
4156n/a def testAttributes(self):
4157n/a self.assertEqual(self.read_file.mode, self.read_mode)
4158n/a self.assertEqual(self.read_file.name, self.cli_conn.fileno())
4159n/a
4160n/a def _testAttributes(self):
4161n/a self.assertEqual(self.write_file.mode, self.write_mode)
4162n/a self.assertEqual(self.write_file.name, self.serv_conn.fileno())
4163n/a
4164n/a def testRealClose(self):
4165n/a self.read_file.close()
4166n/a self.assertRaises(ValueError, self.read_file.fileno)
4167n/a self.cli_conn.close()
4168n/a self.assertRaises(OSError, self.cli_conn.getsockname)
4169n/a
4170n/a def _testRealClose(self):
4171n/a pass
4172n/a
4173n/a
4174n/aclass UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
4175n/a
4176n/a """Repeat the tests from FileObjectClassTestCase with bufsize==0.
4177n/a
4178n/a In this case (and in this case only), it should be possible to
4179n/a create a file object, read a line from it, create another file
4180n/a object, read another line from it, without loss of data in the
4181n/a first file object's buffer. Note that http.client relies on this
4182n/a when reading multiple requests from the same socket."""
4183n/a
4184n/a bufsize = 0 # Use unbuffered mode
4185n/a
4186n/a def testUnbufferedReadline(self):
4187n/a # Read a line, create a new file object, read another line with it
4188n/a line = self.read_file.readline() # first line
4189n/a self.assertEqual(line, b"A. " + self.write_msg) # first line
4190n/a self.read_file = self.cli_conn.makefile('rb', 0)
4191n/a line = self.read_file.readline() # second line
4192n/a self.assertEqual(line, b"B. " + self.write_msg) # second line
4193n/a
4194n/a def _testUnbufferedReadline(self):
4195n/a self.write_file.write(b"A. " + self.write_msg)
4196n/a self.write_file.write(b"B. " + self.write_msg)
4197n/a self.write_file.flush()
4198n/a
4199n/a def testMakefileClose(self):
4200n/a # The file returned by makefile should keep the socket open...
4201n/a self.cli_conn.close()
4202n/a msg = self.cli_conn.recv(1024)
4203n/a self.assertEqual(msg, self.read_msg)
4204n/a # ...until the file is itself closed
4205n/a self.read_file.close()
4206n/a self.assertRaises(OSError, self.cli_conn.recv, 1024)
4207n/a
4208n/a def _testMakefileClose(self):
4209n/a self.write_file.write(self.write_msg)
4210n/a self.write_file.flush()
4211n/a
4212n/a def testMakefileCloseSocketDestroy(self):
4213n/a refcount_before = sys.getrefcount(self.cli_conn)
4214n/a self.read_file.close()
4215n/a refcount_after = sys.getrefcount(self.cli_conn)
4216n/a self.assertEqual(refcount_before - 1, refcount_after)
4217n/a
4218n/a def _testMakefileCloseSocketDestroy(self):
4219n/a pass
4220n/a
4221n/a # Non-blocking ops
4222n/a # NOTE: to set `read_file` as non-blocking, we must call
4223n/a # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
4224n/a
4225n/a def testSmallReadNonBlocking(self):
4226n/a self.cli_conn.setblocking(False)
4227n/a self.assertEqual(self.read_file.readinto(bytearray(10)), None)
4228n/a self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
4229n/a self.evt1.set()
4230n/a self.evt2.wait(1.0)
4231n/a first_seg = self.read_file.read(len(self.read_msg) - 3)
4232n/a if first_seg is None:
4233n/a # Data not arrived (can happen under Windows), wait a bit
4234n/a time.sleep(0.5)
4235n/a first_seg = self.read_file.read(len(self.read_msg) - 3)
4236n/a buf = bytearray(10)
4237n/a n = self.read_file.readinto(buf)
4238n/a self.assertEqual(n, 3)
4239n/a msg = first_seg + buf[:n]
4240n/a self.assertEqual(msg, self.read_msg)
4241n/a self.assertEqual(self.read_file.readinto(bytearray(16)), None)
4242n/a self.assertEqual(self.read_file.read(1), None)
4243n/a
4244n/a def _testSmallReadNonBlocking(self):
4245n/a self.evt1.wait(1.0)
4246n/a self.write_file.write(self.write_msg)
4247n/a self.write_file.flush()
4248n/a self.evt2.set()
4249n/a # Avoid cloding the socket before the server test has finished,
4250n/a # otherwise system recv() will return 0 instead of EWOULDBLOCK.
4251n/a self.serv_finished.wait(5.0)
4252n/a
4253n/a def testWriteNonBlocking(self):
4254n/a self.cli_finished.wait(5.0)
4255n/a # The client thread can't skip directly - the SkipTest exception
4256n/a # would appear as a failure.
4257n/a if self.serv_skipped:
4258n/a self.skipTest(self.serv_skipped)
4259n/a
4260n/a def _testWriteNonBlocking(self):
4261n/a self.serv_skipped = None
4262n/a self.serv_conn.setblocking(False)
4263n/a # Try to saturate the socket buffer pipe with repeated large writes.
4264n/a BIG = b"x" * support.SOCK_MAX_SIZE
4265n/a LIMIT = 10
4266n/a # The first write() succeeds since a chunk of data can be buffered
4267n/a n = self.write_file.write(BIG)
4268n/a self.assertGreater(n, 0)
4269n/a for i in range(LIMIT):
4270n/a n = self.write_file.write(BIG)
4271n/a if n is None:
4272n/a # Succeeded
4273n/a break
4274n/a self.assertGreater(n, 0)
4275n/a else:
4276n/a # Let us know that this test didn't manage to establish
4277n/a # the expected conditions. This is not a failure in itself but,
4278n/a # if it happens repeatedly, the test should be fixed.
4279n/a self.serv_skipped = "failed to saturate the socket buffer"
4280n/a
4281n/a
4282n/aclass LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4283n/a
4284n/a bufsize = 1 # Default-buffered for reading; line-buffered for writing
4285n/a
4286n/a
4287n/aclass SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
4288n/a
4289n/a bufsize = 2 # Exercise the buffering code
4290n/a
4291n/a
4292n/aclass UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
4293n/a """Tests for socket.makefile() in text mode (rather than binary)"""
4294n/a
4295n/a read_mode = 'r'
4296n/a read_msg = MSG.decode('utf-8')
4297n/a write_mode = 'wb'
4298n/a write_msg = MSG
4299n/a newline = ''
4300n/a
4301n/a
4302n/aclass UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
4303n/a """Tests for socket.makefile() in text mode (rather than binary)"""
4304n/a
4305n/a read_mode = 'rb'
4306n/a read_msg = MSG
4307n/a write_mode = 'w'
4308n/a write_msg = MSG.decode('utf-8')
4309n/a newline = ''
4310n/a
4311n/a
4312n/aclass UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
4313n/a """Tests for socket.makefile() in text mode (rather than binary)"""
4314n/a
4315n/a read_mode = 'r'
4316n/a read_msg = MSG.decode('utf-8')
4317n/a write_mode = 'w'
4318n/a write_msg = MSG.decode('utf-8')
4319n/a newline = ''
4320n/a
4321n/a
4322n/aclass NetworkConnectionTest(object):
4323n/a """Prove network connection."""
4324n/a
4325n/a def clientSetUp(self):
4326n/a # We're inherited below by BasicTCPTest2, which also inherits
4327n/a # BasicTCPTest, which defines self.port referenced below.
4328n/a self.cli = socket.create_connection((HOST, self.port))
4329n/a self.serv_conn = self.cli
4330n/a
4331n/aclass BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
4332n/a """Tests that NetworkConnection does not break existing TCP functionality.
4333n/a """
4334n/a
4335n/aclass NetworkConnectionNoServer(unittest.TestCase):
4336n/a
4337n/a class MockSocket(socket.socket):
4338n/a def connect(self, *args):
4339n/a raise socket.timeout('timed out')
4340n/a
4341n/a @contextlib.contextmanager
4342n/a def mocked_socket_module(self):
4343n/a """Return a socket which times out on connect"""
4344n/a old_socket = socket.socket
4345n/a socket.socket = self.MockSocket
4346n/a try:
4347n/a yield
4348n/a finally:
4349n/a socket.socket = old_socket
4350n/a
4351n/a def test_connect(self):
4352n/a port = support.find_unused_port()
4353n/a cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4354n/a self.addCleanup(cli.close)
4355n/a with self.assertRaises(OSError) as cm:
4356n/a cli.connect((HOST, port))
4357n/a self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
4358n/a
4359n/a def test_create_connection(self):
4360n/a # Issue #9792: errors raised by create_connection() should have
4361n/a # a proper errno attribute.
4362n/a port = support.find_unused_port()
4363n/a with self.assertRaises(OSError) as cm:
4364n/a socket.create_connection((HOST, port))
4365n/a
4366n/a # Issue #16257: create_connection() calls getaddrinfo() against
4367n/a # 'localhost'. This may result in an IPV6 addr being returned
4368n/a # as well as an IPV4 one:
4369n/a # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
4370n/a # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)),
4371n/a # (26, 2, 0, '', ('::1', 41230, 0, 0))]
4372n/a #
4373n/a # create_connection() enumerates through all the addresses returned
4374n/a # and if it doesn't successfully bind to any of them, it propagates
4375n/a # the last exception it encountered.
4376n/a #
4377n/a # On Solaris, ENETUNREACH is returned in this circumstance instead
4378n/a # of ECONNREFUSED. So, if that errno exists, add it to our list of
4379n/a # expected errnos.
4380n/a expected_errnos = [ errno.ECONNREFUSED, ]
4381n/a if hasattr(errno, 'ENETUNREACH'):
4382n/a expected_errnos.append(errno.ENETUNREACH)
4383n/a
4384n/a self.assertIn(cm.exception.errno, expected_errnos)
4385n/a
4386n/a def test_create_connection_timeout(self):
4387n/a # Issue #9792: create_connection() should not recast timeout errors
4388n/a # as generic socket errors.
4389n/a with self.mocked_socket_module():
4390n/a with self.assertRaises(socket.timeout):
4391n/a socket.create_connection((HOST, 1234))
4392n/a
4393n/a
4394n/a@unittest.skipUnless(thread, 'Threading required for this test.')
4395n/aclass NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
4396n/a
4397n/a def __init__(self, methodName='runTest'):
4398n/a SocketTCPTest.__init__(self, methodName=methodName)
4399n/a ThreadableTest.__init__(self)
4400n/a
4401n/a def clientSetUp(self):
4402n/a self.source_port = support.find_unused_port()
4403n/a
4404n/a def clientTearDown(self):
4405n/a self.cli.close()
4406n/a self.cli = None
4407n/a ThreadableTest.clientTearDown(self)
4408n/a
4409n/a def _justAccept(self):
4410n/a conn, addr = self.serv.accept()
4411n/a conn.close()
4412n/a
4413n/a testFamily = _justAccept
4414n/a def _testFamily(self):
4415n/a self.cli = socket.create_connection((HOST, self.port), timeout=30)
4416n/a self.addCleanup(self.cli.close)
4417n/a self.assertEqual(self.cli.family, 2)
4418n/a
4419n/a testSourceAddress = _justAccept
4420n/a def _testSourceAddress(self):
4421n/a self.cli = socket.create_connection((HOST, self.port), timeout=30,
4422n/a source_address=('', self.source_port))
4423n/a self.addCleanup(self.cli.close)
4424n/a self.assertEqual(self.cli.getsockname()[1], self.source_port)
4425n/a # The port number being used is sufficient to show that the bind()
4426n/a # call happened.
4427n/a
4428n/a testTimeoutDefault = _justAccept
4429n/a def _testTimeoutDefault(self):
4430n/a # passing no explicit timeout uses socket's global default
4431n/a self.assertTrue(socket.getdefaulttimeout() is None)
4432n/a socket.setdefaulttimeout(42)
4433n/a try:
4434n/a self.cli = socket.create_connection((HOST, self.port))
4435n/a self.addCleanup(self.cli.close)
4436n/a finally:
4437n/a socket.setdefaulttimeout(None)
4438n/a self.assertEqual(self.cli.gettimeout(), 42)
4439n/a
4440n/a testTimeoutNone = _justAccept
4441n/a def _testTimeoutNone(self):
4442n/a # None timeout means the same as sock.settimeout(None)
4443n/a self.assertTrue(socket.getdefaulttimeout() is None)
4444n/a socket.setdefaulttimeout(30)
4445n/a try:
4446n/a self.cli = socket.create_connection((HOST, self.port), timeout=None)
4447n/a self.addCleanup(self.cli.close)
4448n/a finally:
4449n/a socket.setdefaulttimeout(None)
4450n/a self.assertEqual(self.cli.gettimeout(), None)
4451n/a
4452n/a testTimeoutValueNamed = _justAccept
4453n/a def _testTimeoutValueNamed(self):
4454n/a self.cli = socket.create_connection((HOST, self.port), timeout=30)
4455n/a self.assertEqual(self.cli.gettimeout(), 30)
4456n/a
4457n/a testTimeoutValueNonamed = _justAccept
4458n/a def _testTimeoutValueNonamed(self):
4459n/a self.cli = socket.create_connection((HOST, self.port), 30)
4460n/a self.addCleanup(self.cli.close)
4461n/a self.assertEqual(self.cli.gettimeout(), 30)
4462n/a
4463n/a@unittest.skipUnless(thread, 'Threading required for this test.')
4464n/aclass NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
4465n/a
4466n/a def __init__(self, methodName='runTest'):
4467n/a SocketTCPTest.__init__(self, methodName=methodName)
4468n/a ThreadableTest.__init__(self)
4469n/a
4470n/a def clientSetUp(self):
4471n/a pass
4472n/a
4473n/a def clientTearDown(self):
4474n/a self.cli.close()
4475n/a self.cli = None
4476n/a ThreadableTest.clientTearDown(self)
4477n/a
4478n/a def testInsideTimeout(self):
4479n/a conn, addr = self.serv.accept()
4480n/a self.addCleanup(conn.close)
4481n/a time.sleep(3)
4482n/a conn.send(b"done!")
4483n/a testOutsideTimeout = testInsideTimeout
4484n/a
4485n/a def _testInsideTimeout(self):
4486n/a self.cli = sock = socket.create_connection((HOST, self.port))
4487n/a data = sock.recv(5)
4488n/a self.assertEqual(data, b"done!")
4489n/a
4490n/a def _testOutsideTimeout(self):
4491n/a self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
4492n/a self.assertRaises(socket.timeout, lambda: sock.recv(5))
4493n/a
4494n/a
4495n/aclass TCPTimeoutTest(SocketTCPTest):
4496n/a
4497n/a def testTCPTimeout(self):
4498n/a def raise_timeout(*args, **kwargs):
4499n/a self.serv.settimeout(1.0)
4500n/a self.serv.accept()
4501n/a self.assertRaises(socket.timeout, raise_timeout,
4502n/a "Error generating a timeout exception (TCP)")
4503n/a
4504n/a def testTimeoutZero(self):
4505n/a ok = False
4506n/a try:
4507n/a self.serv.settimeout(0.0)
4508n/a foo = self.serv.accept()
4509n/a except socket.timeout:
4510n/a self.fail("caught timeout instead of error (TCP)")
4511n/a except OSError:
4512n/a ok = True
4513n/a except:
4514n/a self.fail("caught unexpected exception (TCP)")
4515n/a if not ok:
4516n/a self.fail("accept() returned success when we did not expect it")
4517n/a
4518n/a @unittest.skipUnless(hasattr(signal, 'alarm'),
4519n/a 'test needs signal.alarm()')
4520n/a def testInterruptedTimeout(self):
4521n/a # XXX I don't know how to do this test on MSWindows or any other
4522n/a # plaform that doesn't support signal.alarm() or os.kill(), though
4523n/a # the bug should have existed on all platforms.
4524n/a self.serv.settimeout(5.0) # must be longer than alarm
4525n/a class Alarm(Exception):
4526n/a pass
4527n/a def alarm_handler(signal, frame):
4528n/a raise Alarm
4529n/a old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
4530n/a try:
4531n/a signal.alarm(2) # POSIX allows alarm to be up to 1 second early
4532n/a try:
4533n/a foo = self.serv.accept()
4534n/a except socket.timeout:
4535n/a self.fail("caught timeout instead of Alarm")
4536n/a except Alarm:
4537n/a pass
4538n/a except:
4539n/a self.fail("caught other exception instead of Alarm:"
4540n/a " %s(%s):\n%s" %
4541n/a (sys.exc_info()[:2] + (traceback.format_exc(),)))
4542n/a else:
4543n/a self.fail("nothing caught")
4544n/a finally:
4545n/a signal.alarm(0) # shut off alarm
4546n/a except Alarm:
4547n/a self.fail("got Alarm in wrong place")
4548n/a finally:
4549n/a # no alarm can be pending. Safe to restore old handler.
4550n/a signal.signal(signal.SIGALRM, old_alarm)
4551n/a
4552n/aclass UDPTimeoutTest(SocketUDPTest):
4553n/a
4554n/a def testUDPTimeout(self):
4555n/a def raise_timeout(*args, **kwargs):
4556n/a self.serv.settimeout(1.0)
4557n/a self.serv.recv(1024)
4558n/a self.assertRaises(socket.timeout, raise_timeout,
4559n/a "Error generating a timeout exception (UDP)")
4560n/a
4561n/a def testTimeoutZero(self):
4562n/a ok = False
4563n/a try:
4564n/a self.serv.settimeout(0.0)
4565n/a foo = self.serv.recv(1024)
4566n/a except socket.timeout:
4567n/a self.fail("caught timeout instead of error (UDP)")
4568n/a except OSError:
4569n/a ok = True
4570n/a except:
4571n/a self.fail("caught unexpected exception (UDP)")
4572n/a if not ok:
4573n/a self.fail("recv() returned success when we did not expect it")
4574n/a
4575n/aclass TestExceptions(unittest.TestCase):
4576n/a
4577n/a def testExceptionTree(self):
4578n/a self.assertTrue(issubclass(OSError, Exception))
4579n/a self.assertTrue(issubclass(socket.herror, OSError))
4580n/a self.assertTrue(issubclass(socket.gaierror, OSError))
4581n/a self.assertTrue(issubclass(socket.timeout, OSError))
4582n/a
4583n/a def test_setblocking_invalidfd(self):
4584n/a # Regression test for issue #28471
4585n/a
4586n/a sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
4587n/a sock = socket.socket(
4588n/a socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
4589n/a sock0.close()
4590n/a self.addCleanup(sock.detach)
4591n/a
4592n/a with self.assertRaises(OSError):
4593n/a sock.setblocking(False)
4594n/a
4595n/a
4596n/a@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
4597n/aclass TestLinuxAbstractNamespace(unittest.TestCase):
4598n/a
4599n/a UNIX_PATH_MAX = 108
4600n/a
4601n/a def testLinuxAbstractNamespace(self):
4602n/a address = b"\x00python-test-hello\x00\xff"
4603n/a with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
4604n/a s1.bind(address)
4605n/a s1.listen()
4606n/a with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
4607n/a s2.connect(s1.getsockname())
4608n/a with s1.accept()[0] as s3:
4609n/a self.assertEqual(s1.getsockname(), address)
4610n/a self.assertEqual(s2.getpeername(), address)
4611n/a
4612n/a def testMaxName(self):
4613n/a address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
4614n/a with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4615n/a s.bind(address)
4616n/a self.assertEqual(s.getsockname(), address)
4617n/a
4618n/a def testNameOverflow(self):
4619n/a address = "\x00" + "h" * self.UNIX_PATH_MAX
4620n/a with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4621n/a self.assertRaises(OSError, s.bind, address)
4622n/a
4623n/a def testStrName(self):
4624n/a # Check that an abstract name can be passed as a string.
4625n/a s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
4626n/a try:
4627n/a s.bind("\x00python\x00test\x00")
4628n/a self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
4629n/a finally:
4630n/a s.close()
4631n/a
4632n/a def testBytearrayName(self):
4633n/a # Check that an abstract name can be passed as a bytearray.
4634n/a with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
4635n/a s.bind(bytearray(b"\x00python\x00test\x00"))
4636n/a self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
4637n/a
4638n/a@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
4639n/aclass TestUnixDomain(unittest.TestCase):
4640n/a
4641n/a def setUp(self):
4642n/a self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
4643n/a
4644n/a def tearDown(self):
4645n/a self.sock.close()
4646n/a
4647n/a def encoded(self, path):
4648n/a # Return the given path encoded in the file system encoding,
4649n/a # or skip the test if this is not possible.
4650n/a try:
4651n/a return os.fsencode(path)
4652n/a except UnicodeEncodeError:
4653n/a self.skipTest(
4654n/a "Pathname {0!a} cannot be represented in file "
4655n/a "system encoding {1!r}".format(
4656n/a path, sys.getfilesystemencoding()))
4657n/a
4658n/a def bind(self, sock, path):
4659n/a # Bind the socket
4660n/a try:
4661n/a support.bind_unix_socket(sock, path)
4662n/a except OSError as e:
4663n/a if str(e) == "AF_UNIX path too long":
4664n/a self.skipTest(
4665n/a "Pathname {0!a} is too long to serve as an AF_UNIX path"
4666n/a .format(path))
4667n/a else:
4668n/a raise
4669n/a
4670n/a def testStrAddr(self):
4671n/a # Test binding to and retrieving a normal string pathname.
4672n/a path = os.path.abspath(support.TESTFN)
4673n/a self.bind(self.sock, path)
4674n/a self.addCleanup(support.unlink, path)
4675n/a self.assertEqual(self.sock.getsockname(), path)
4676n/a
4677n/a def testBytesAddr(self):
4678n/a # Test binding to a bytes pathname.
4679n/a path = os.path.abspath(support.TESTFN)
4680n/a self.bind(self.sock, self.encoded(path))
4681n/a self.addCleanup(support.unlink, path)
4682n/a self.assertEqual(self.sock.getsockname(), path)
4683n/a
4684n/a def testSurrogateescapeBind(self):
4685n/a # Test binding to a valid non-ASCII pathname, with the
4686n/a # non-ASCII bytes supplied using surrogateescape encoding.
4687n/a path = os.path.abspath(support.TESTFN_UNICODE)
4688n/a b = self.encoded(path)
4689n/a self.bind(self.sock, b.decode("ascii", "surrogateescape"))
4690n/a self.addCleanup(support.unlink, path)
4691n/a self.assertEqual(self.sock.getsockname(), path)
4692n/a
4693n/a def testUnencodableAddr(self):
4694n/a # Test binding to a pathname that cannot be encoded in the
4695n/a # file system encoding.
4696n/a if support.TESTFN_UNENCODABLE is None:
4697n/a self.skipTest("No unencodable filename available")
4698n/a path = os.path.abspath(support.TESTFN_UNENCODABLE)
4699n/a self.bind(self.sock, path)
4700n/a self.addCleanup(support.unlink, path)
4701n/a self.assertEqual(self.sock.getsockname(), path)
4702n/a
4703n/a@unittest.skipUnless(thread, 'Threading required for this test.')
4704n/aclass BufferIOTest(SocketConnectedTest):
4705n/a """
4706n/a Test the buffer versions of socket.recv() and socket.send().
4707n/a """
4708n/a def __init__(self, methodName='runTest'):
4709n/a SocketConnectedTest.__init__(self, methodName=methodName)
4710n/a
4711n/a def testRecvIntoArray(self):
4712n/a buf = array.array("B", [0] * len(MSG))
4713n/a nbytes = self.cli_conn.recv_into(buf)
4714n/a self.assertEqual(nbytes, len(MSG))
4715n/a buf = buf.tobytes()
4716n/a msg = buf[:len(MSG)]
4717n/a self.assertEqual(msg, MSG)
4718n/a
4719n/a def _testRecvIntoArray(self):
4720n/a buf = bytes(MSG)
4721n/a self.serv_conn.send(buf)
4722n/a
4723n/a def testRecvIntoBytearray(self):
4724n/a buf = bytearray(1024)
4725n/a nbytes = self.cli_conn.recv_into(buf)
4726n/a self.assertEqual(nbytes, len(MSG))
4727n/a msg = buf[:len(MSG)]
4728n/a self.assertEqual(msg, MSG)
4729n/a
4730n/a _testRecvIntoBytearray = _testRecvIntoArray
4731n/a
4732n/a def testRecvIntoMemoryview(self):
4733n/a buf = bytearray(1024)
4734n/a nbytes = self.cli_conn.recv_into(memoryview(buf))
4735n/a self.assertEqual(nbytes, len(MSG))
4736n/a msg = buf[:len(MSG)]
4737n/a self.assertEqual(msg, MSG)
4738n/a
4739n/a _testRecvIntoMemoryview = _testRecvIntoArray
4740n/a
4741n/a def testRecvFromIntoArray(self):
4742n/a buf = array.array("B", [0] * len(MSG))
4743n/a nbytes, addr = self.cli_conn.recvfrom_into(buf)
4744n/a self.assertEqual(nbytes, len(MSG))
4745n/a buf = buf.tobytes()
4746n/a msg = buf[:len(MSG)]
4747n/a self.assertEqual(msg, MSG)
4748n/a
4749n/a def _testRecvFromIntoArray(self):
4750n/a buf = bytes(MSG)
4751n/a self.serv_conn.send(buf)
4752n/a
4753n/a def testRecvFromIntoBytearray(self):
4754n/a buf = bytearray(1024)
4755n/a nbytes, addr = self.cli_conn.recvfrom_into(buf)
4756n/a self.assertEqual(nbytes, len(MSG))
4757n/a msg = buf[:len(MSG)]
4758n/a self.assertEqual(msg, MSG)
4759n/a
4760n/a _testRecvFromIntoBytearray = _testRecvFromIntoArray
4761n/a
4762n/a def testRecvFromIntoMemoryview(self):
4763n/a buf = bytearray(1024)
4764n/a nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
4765n/a self.assertEqual(nbytes, len(MSG))
4766n/a msg = buf[:len(MSG)]
4767n/a self.assertEqual(msg, MSG)
4768n/a
4769n/a _testRecvFromIntoMemoryview = _testRecvFromIntoArray
4770n/a
4771n/a def testRecvFromIntoSmallBuffer(self):
4772n/a # See issue #20246.
4773n/a buf = bytearray(8)
4774n/a self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
4775n/a
4776n/a def _testRecvFromIntoSmallBuffer(self):
4777n/a self.serv_conn.send(MSG)
4778n/a
4779n/a def testRecvFromIntoEmptyBuffer(self):
4780n/a buf = bytearray()
4781n/a self.cli_conn.recvfrom_into(buf)
4782n/a self.cli_conn.recvfrom_into(buf, 0)
4783n/a
4784n/a _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
4785n/a
4786n/a
4787n/aTIPC_STYPE = 2000
4788n/aTIPC_LOWER = 200
4789n/aTIPC_UPPER = 210
4790n/a
4791n/adef isTipcAvailable():
4792n/a """Check if the TIPC module is loaded
4793n/a
4794n/a The TIPC module is not loaded automatically on Ubuntu and probably
4795n/a other Linux distros.
4796n/a """
4797n/a if not hasattr(socket, "AF_TIPC"):
4798n/a return False
4799n/a try:
4800n/a f = open("/proc/modules")
4801n/a except (FileNotFoundError, IsADirectoryError, PermissionError):
4802n/a # It's ok if the file does not exist, is a directory or if we
4803n/a # have not the permission to read it.
4804n/a return False
4805n/a with f:
4806n/a for line in f:
4807n/a if line.startswith("tipc "):
4808n/a return True
4809n/a return False
4810n/a
4811n/a@unittest.skipUnless(isTipcAvailable(),
4812n/a "TIPC module is not loaded, please 'sudo modprobe tipc'")
4813n/aclass TIPCTest(unittest.TestCase):
4814n/a def testRDM(self):
4815n/a srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
4816n/a cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
4817n/a self.addCleanup(srv.close)
4818n/a self.addCleanup(cli.close)
4819n/a
4820n/a srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
4821n/a srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
4822n/a TIPC_LOWER, TIPC_UPPER)
4823n/a srv.bind(srvaddr)
4824n/a
4825n/a sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
4826n/a TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
4827n/a cli.sendto(MSG, sendaddr)
4828n/a
4829n/a msg, recvaddr = srv.recvfrom(1024)
4830n/a
4831n/a self.assertEqual(cli.getsockname(), recvaddr)
4832n/a self.assertEqual(msg, MSG)
4833n/a
4834n/a
4835n/a@unittest.skipUnless(isTipcAvailable(),
4836n/a "TIPC module is not loaded, please 'sudo modprobe tipc'")
4837n/aclass TIPCThreadableTest(unittest.TestCase, ThreadableTest):
4838n/a def __init__(self, methodName = 'runTest'):
4839n/a unittest.TestCase.__init__(self, methodName = methodName)
4840n/a ThreadableTest.__init__(self)
4841n/a
4842n/a def setUp(self):
4843n/a self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
4844n/a self.addCleanup(self.srv.close)
4845n/a self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
4846n/a srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
4847n/a TIPC_LOWER, TIPC_UPPER)
4848n/a self.srv.bind(srvaddr)
4849n/a self.srv.listen()
4850n/a self.serverExplicitReady()
4851n/a self.conn, self.connaddr = self.srv.accept()
4852n/a self.addCleanup(self.conn.close)
4853n/a
4854n/a def clientSetUp(self):
4855n/a # There is a hittable race between serverExplicitReady() and the
4856n/a # accept() call; sleep a little while to avoid it, otherwise
4857n/a # we could get an exception
4858n/a time.sleep(0.1)
4859n/a self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
4860n/a self.addCleanup(self.cli.close)
4861n/a addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
4862n/a TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
4863n/a self.cli.connect(addr)
4864n/a self.cliaddr = self.cli.getsockname()
4865n/a
4866n/a def testStream(self):
4867n/a msg = self.conn.recv(1024)
4868n/a self.assertEqual(msg, MSG)
4869n/a self.assertEqual(self.cliaddr, self.connaddr)
4870n/a
4871n/a def _testStream(self):
4872n/a self.cli.send(MSG)
4873n/a self.cli.close()
4874n/a
4875n/a
4876n/a@unittest.skipUnless(thread, 'Threading required for this test.')
4877n/aclass ContextManagersTest(ThreadedTCPSocketTest):
4878n/a
4879n/a def _testSocketClass(self):
4880n/a # base test
4881n/a with socket.socket() as sock:
4882n/a self.assertFalse(sock._closed)
4883n/a self.assertTrue(sock._closed)
4884n/a # close inside with block
4885n/a with socket.socket() as sock:
4886n/a sock.close()
4887n/a self.assertTrue(sock._closed)
4888n/a # exception inside with block
4889n/a with socket.socket() as sock:
4890n/a self.assertRaises(OSError, sock.sendall, b'foo')
4891n/a self.assertTrue(sock._closed)
4892n/a
4893n/a def testCreateConnectionBase(self):
4894n/a conn, addr = self.serv.accept()
4895n/a self.addCleanup(conn.close)
4896n/a data = conn.recv(1024)
4897n/a conn.sendall(data)
4898n/a
4899n/a def _testCreateConnectionBase(self):
4900n/a address = self.serv.getsockname()
4901n/a with socket.create_connection(address) as sock:
4902n/a self.assertFalse(sock._closed)
4903n/a sock.sendall(b'foo')
4904n/a self.assertEqual(sock.recv(1024), b'foo')
4905n/a self.assertTrue(sock._closed)
4906n/a
4907n/a def testCreateConnectionClose(self):
4908n/a conn, addr = self.serv.accept()
4909n/a self.addCleanup(conn.close)
4910n/a data = conn.recv(1024)
4911n/a conn.sendall(data)
4912n/a
4913n/a def _testCreateConnectionClose(self):
4914n/a address = self.serv.getsockname()
4915n/a with socket.create_connection(address) as sock:
4916n/a sock.close()
4917n/a self.assertTrue(sock._closed)
4918n/a self.assertRaises(OSError, sock.sendall, b'foo')
4919n/a
4920n/a
4921n/aclass InheritanceTest(unittest.TestCase):
4922n/a @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
4923n/a "SOCK_CLOEXEC not defined")
4924n/a @support.requires_linux_version(2, 6, 28)
4925n/a def test_SOCK_CLOEXEC(self):
4926n/a with socket.socket(socket.AF_INET,
4927n/a socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
4928n/a self.assertTrue(s.type & socket.SOCK_CLOEXEC)
4929n/a self.assertFalse(s.get_inheritable())
4930n/a
4931n/a def test_default_inheritable(self):
4932n/a sock = socket.socket()
4933n/a with sock:
4934n/a self.assertEqual(sock.get_inheritable(), False)
4935n/a
4936n/a def test_dup(self):
4937n/a sock = socket.socket()
4938n/a with sock:
4939n/a newsock = sock.dup()
4940n/a sock.close()
4941n/a with newsock:
4942n/a self.assertEqual(newsock.get_inheritable(), False)
4943n/a
4944n/a def test_set_inheritable(self):
4945n/a sock = socket.socket()
4946n/a with sock:
4947n/a sock.set_inheritable(True)
4948n/a self.assertEqual(sock.get_inheritable(), True)
4949n/a
4950n/a sock.set_inheritable(False)
4951n/a self.assertEqual(sock.get_inheritable(), False)
4952n/a
4953n/a @unittest.skipIf(fcntl is None, "need fcntl")
4954n/a def test_get_inheritable_cloexec(self):
4955n/a sock = socket.socket()
4956n/a with sock:
4957n/a fd = sock.fileno()
4958n/a self.assertEqual(sock.get_inheritable(), False)
4959n/a
4960n/a # clear FD_CLOEXEC flag
4961n/a flags = fcntl.fcntl(fd, fcntl.F_GETFD)
4962n/a flags &= ~fcntl.FD_CLOEXEC
4963n/a fcntl.fcntl(fd, fcntl.F_SETFD, flags)
4964n/a
4965n/a self.assertEqual(sock.get_inheritable(), True)
4966n/a
4967n/a @unittest.skipIf(fcntl is None, "need fcntl")
4968n/a def test_set_inheritable_cloexec(self):
4969n/a sock = socket.socket()
4970n/a with sock:
4971n/a fd = sock.fileno()
4972n/a self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
4973n/a fcntl.FD_CLOEXEC)
4974n/a
4975n/a sock.set_inheritable(True)
4976n/a self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
4977n/a 0)
4978n/a
4979n/a
4980n/a @unittest.skipUnless(hasattr(socket, "socketpair"),
4981n/a "need socket.socketpair()")
4982n/a def test_socketpair(self):
4983n/a s1, s2 = socket.socketpair()
4984n/a self.addCleanup(s1.close)
4985n/a self.addCleanup(s2.close)
4986n/a self.assertEqual(s1.get_inheritable(), False)
4987n/a self.assertEqual(s2.get_inheritable(), False)
4988n/a
4989n/a
4990n/a@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
4991n/a "SOCK_NONBLOCK not defined")
4992n/aclass NonblockConstantTest(unittest.TestCase):
4993n/a def checkNonblock(self, s, nonblock=True, timeout=0.0):
4994n/a if nonblock:
4995n/a self.assertTrue(s.type & socket.SOCK_NONBLOCK)
4996n/a self.assertEqual(s.gettimeout(), timeout)
4997n/a else:
4998n/a self.assertFalse(s.type & socket.SOCK_NONBLOCK)
4999n/a self.assertEqual(s.gettimeout(), None)
5000n/a
5001n/a @support.requires_linux_version(2, 6, 28)
5002n/a def test_SOCK_NONBLOCK(self):
5003n/a # a lot of it seems silly and redundant, but I wanted to test that
5004n/a # changing back and forth worked ok
5005n/a with socket.socket(socket.AF_INET,
5006n/a socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
5007n/a self.checkNonblock(s)
5008n/a s.setblocking(1)
5009n/a self.checkNonblock(s, False)
5010n/a s.setblocking(0)
5011n/a self.checkNonblock(s)
5012n/a s.settimeout(None)
5013n/a self.checkNonblock(s, False)
5014n/a s.settimeout(2.0)
5015n/a self.checkNonblock(s, timeout=2.0)
5016n/a s.setblocking(1)
5017n/a self.checkNonblock(s, False)
5018n/a # defaulttimeout
5019n/a t = socket.getdefaulttimeout()
5020n/a socket.setdefaulttimeout(0.0)
5021n/a with socket.socket() as s:
5022n/a self.checkNonblock(s)
5023n/a socket.setdefaulttimeout(None)
5024n/a with socket.socket() as s:
5025n/a self.checkNonblock(s, False)
5026n/a socket.setdefaulttimeout(2.0)
5027n/a with socket.socket() as s:
5028n/a self.checkNonblock(s, timeout=2.0)
5029n/a socket.setdefaulttimeout(None)
5030n/a with socket.socket() as s:
5031n/a self.checkNonblock(s, False)
5032n/a socket.setdefaulttimeout(t)
5033n/a
5034n/a
5035n/a@unittest.skipUnless(os.name == "nt", "Windows specific")
5036n/a@unittest.skipUnless(multiprocessing, "need multiprocessing")
5037n/aclass TestSocketSharing(SocketTCPTest):
5038n/a # This must be classmethod and not staticmethod or multiprocessing
5039n/a # won't be able to bootstrap it.
5040n/a @classmethod
5041n/a def remoteProcessServer(cls, q):
5042n/a # Recreate socket from shared data
5043n/a sdata = q.get()
5044n/a message = q.get()
5045n/a
5046n/a s = socket.fromshare(sdata)
5047n/a s2, c = s.accept()
5048n/a
5049n/a # Send the message
5050n/a s2.sendall(message)
5051n/a s2.close()
5052n/a s.close()
5053n/a
5054n/a def testShare(self):
5055n/a # Transfer the listening server socket to another process
5056n/a # and service it from there.
5057n/a
5058n/a # Create process:
5059n/a q = multiprocessing.Queue()
5060n/a p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
5061n/a p.start()
5062n/a
5063n/a # Get the shared socket data
5064n/a data = self.serv.share(p.pid)
5065n/a
5066n/a # Pass the shared socket to the other process
5067n/a addr = self.serv.getsockname()
5068n/a self.serv.close()
5069n/a q.put(data)
5070n/a
5071n/a # The data that the server will send us
5072n/a message = b"slapmahfro"
5073n/a q.put(message)
5074n/a
5075n/a # Connect
5076n/a s = socket.create_connection(addr)
5077n/a # listen for the data
5078n/a m = []
5079n/a while True:
5080n/a data = s.recv(100)
5081n/a if not data:
5082n/a break
5083n/a m.append(data)
5084n/a s.close()
5085n/a received = b"".join(m)
5086n/a self.assertEqual(received, message)
5087n/a p.join()
5088n/a
5089n/a def testShareLength(self):
5090n/a data = self.serv.share(os.getpid())
5091n/a self.assertRaises(ValueError, socket.fromshare, data[:-1])
5092n/a self.assertRaises(ValueError, socket.fromshare, data+b"foo")
5093n/a
5094n/a def compareSockets(self, org, other):
5095n/a # socket sharing is expected to work only for blocking socket
5096n/a # since the internal python timeout value isn't transferred.
5097n/a self.assertEqual(org.gettimeout(), None)
5098n/a self.assertEqual(org.gettimeout(), other.gettimeout())
5099n/a
5100n/a self.assertEqual(org.family, other.family)
5101n/a self.assertEqual(org.type, other.type)
5102n/a # If the user specified "0" for proto, then
5103n/a # internally windows will have picked the correct value.
5104n/a # Python introspection on the socket however will still return
5105n/a # 0. For the shared socket, the python value is recreated
5106n/a # from the actual value, so it may not compare correctly.
5107n/a if org.proto != 0:
5108n/a self.assertEqual(org.proto, other.proto)
5109n/a
5110n/a def testShareLocal(self):
5111n/a data = self.serv.share(os.getpid())
5112n/a s = socket.fromshare(data)
5113n/a try:
5114n/a self.compareSockets(self.serv, s)
5115n/a finally:
5116n/a s.close()
5117n/a
5118n/a def testTypes(self):
5119n/a families = [socket.AF_INET, socket.AF_INET6]
5120n/a types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
5121n/a for f in families:
5122n/a for t in types:
5123n/a try:
5124n/a source = socket.socket(f, t)
5125n/a except OSError:
5126n/a continue # This combination is not supported
5127n/a try:
5128n/a data = source.share(os.getpid())
5129n/a shared = socket.fromshare(data)
5130n/a try:
5131n/a self.compareSockets(source, shared)
5132n/a finally:
5133n/a shared.close()
5134n/a finally:
5135n/a source.close()
5136n/a
5137n/a
5138n/a@unittest.skipUnless(thread, 'Threading required for this test.')
5139n/aclass SendfileUsingSendTest(ThreadedTCPSocketTest):
5140n/a """
5141n/a Test the send() implementation of socket.sendfile().
5142n/a """
5143n/a
5144n/a FILESIZE = (10 * 1024 * 1024) # 10MB
5145n/a BUFSIZE = 8192
5146n/a FILEDATA = b""
5147n/a TIMEOUT = 2
5148n/a
5149n/a @classmethod
5150n/a def setUpClass(cls):
5151n/a def chunks(total, step):
5152n/a assert total >= step
5153n/a while total > step:
5154n/a yield step
5155n/a total -= step
5156n/a if total:
5157n/a yield total
5158n/a
5159n/a chunk = b"".join([random.choice(string.ascii_letters).encode()
5160n/a for i in range(cls.BUFSIZE)])
5161n/a with open(support.TESTFN, 'wb') as f:
5162n/a for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
5163n/a f.write(chunk)
5164n/a with open(support.TESTFN, 'rb') as f:
5165n/a cls.FILEDATA = f.read()
5166n/a assert len(cls.FILEDATA) == cls.FILESIZE
5167n/a
5168n/a @classmethod
5169n/a def tearDownClass(cls):
5170n/a support.unlink(support.TESTFN)
5171n/a
5172n/a def accept_conn(self):
5173n/a self.serv.settimeout(self.TIMEOUT)
5174n/a conn, addr = self.serv.accept()
5175n/a conn.settimeout(self.TIMEOUT)
5176n/a self.addCleanup(conn.close)
5177n/a return conn
5178n/a
5179n/a def recv_data(self, conn):
5180n/a received = []
5181n/a while True:
5182n/a chunk = conn.recv(self.BUFSIZE)
5183n/a if not chunk:
5184n/a break
5185n/a received.append(chunk)
5186n/a return b''.join(received)
5187n/a
5188n/a def meth_from_sock(self, sock):
5189n/a # Depending on the mixin class being run return either send()
5190n/a # or sendfile() method implementation.
5191n/a return getattr(sock, "_sendfile_use_send")
5192n/a
5193n/a # regular file
5194n/a
5195n/a def _testRegularFile(self):
5196n/a address = self.serv.getsockname()
5197n/a file = open(support.TESTFN, 'rb')
5198n/a with socket.create_connection(address) as sock, file as file:
5199n/a meth = self.meth_from_sock(sock)
5200n/a sent = meth(file)
5201n/a self.assertEqual(sent, self.FILESIZE)
5202n/a self.assertEqual(file.tell(), self.FILESIZE)
5203n/a
5204n/a def testRegularFile(self):
5205n/a conn = self.accept_conn()
5206n/a data = self.recv_data(conn)
5207n/a self.assertEqual(len(data), self.FILESIZE)
5208n/a self.assertEqual(data, self.FILEDATA)
5209n/a
5210n/a # non regular file
5211n/a
5212n/a def _testNonRegularFile(self):
5213n/a address = self.serv.getsockname()
5214n/a file = io.BytesIO(self.FILEDATA)
5215n/a with socket.create_connection(address) as sock, file as file:
5216n/a sent = sock.sendfile(file)
5217n/a self.assertEqual(sent, self.FILESIZE)
5218n/a self.assertEqual(file.tell(), self.FILESIZE)
5219n/a self.assertRaises(socket._GiveupOnSendfile,
5220n/a sock._sendfile_use_sendfile, file)
5221n/a
5222n/a def testNonRegularFile(self):
5223n/a conn = self.accept_conn()
5224n/a data = self.recv_data(conn)
5225n/a self.assertEqual(len(data), self.FILESIZE)
5226n/a self.assertEqual(data, self.FILEDATA)
5227n/a
5228n/a # empty file
5229n/a
5230n/a def _testEmptyFileSend(self):
5231n/a address = self.serv.getsockname()
5232n/a filename = support.TESTFN + "2"
5233n/a with open(filename, 'wb'):
5234n/a self.addCleanup(support.unlink, filename)
5235n/a file = open(filename, 'rb')
5236n/a with socket.create_connection(address) as sock, file as file:
5237n/a meth = self.meth_from_sock(sock)
5238n/a sent = meth(file)
5239n/a self.assertEqual(sent, 0)
5240n/a self.assertEqual(file.tell(), 0)
5241n/a
5242n/a def testEmptyFileSend(self):
5243n/a conn = self.accept_conn()
5244n/a data = self.recv_data(conn)
5245n/a self.assertEqual(data, b"")
5246n/a
5247n/a # offset
5248n/a
5249n/a def _testOffset(self):
5250n/a address = self.serv.getsockname()
5251n/a file = open(support.TESTFN, 'rb')
5252n/a with socket.create_connection(address) as sock, file as file:
5253n/a meth = self.meth_from_sock(sock)
5254n/a sent = meth(file, offset=5000)
5255n/a self.assertEqual(sent, self.FILESIZE - 5000)
5256n/a self.assertEqual(file.tell(), self.FILESIZE)
5257n/a
5258n/a def testOffset(self):
5259n/a conn = self.accept_conn()
5260n/a data = self.recv_data(conn)
5261n/a self.assertEqual(len(data), self.FILESIZE - 5000)
5262n/a self.assertEqual(data, self.FILEDATA[5000:])
5263n/a
5264n/a # count
5265n/a
5266n/a def _testCount(self):
5267n/a address = self.serv.getsockname()
5268n/a file = open(support.TESTFN, 'rb')
5269n/a with socket.create_connection(address, timeout=2) as sock, file as file:
5270n/a count = 5000007
5271n/a meth = self.meth_from_sock(sock)
5272n/a sent = meth(file, count=count)
5273n/a self.assertEqual(sent, count)
5274n/a self.assertEqual(file.tell(), count)
5275n/a
5276n/a def testCount(self):
5277n/a count = 5000007
5278n/a conn = self.accept_conn()
5279n/a data = self.recv_data(conn)
5280n/a self.assertEqual(len(data), count)
5281n/a self.assertEqual(data, self.FILEDATA[:count])
5282n/a
5283n/a # count small
5284n/a
5285n/a def _testCountSmall(self):
5286n/a address = self.serv.getsockname()
5287n/a file = open(support.TESTFN, 'rb')
5288n/a with socket.create_connection(address, timeout=2) as sock, file as file:
5289n/a count = 1
5290n/a meth = self.meth_from_sock(sock)
5291n/a sent = meth(file, count=count)
5292n/a self.assertEqual(sent, count)
5293n/a self.assertEqual(file.tell(), count)
5294n/a
5295n/a def testCountSmall(self):
5296n/a count = 1
5297n/a conn = self.accept_conn()
5298n/a data = self.recv_data(conn)
5299n/a self.assertEqual(len(data), count)
5300n/a self.assertEqual(data, self.FILEDATA[:count])
5301n/a
5302n/a # count + offset
5303n/a
5304n/a def _testCountWithOffset(self):
5305n/a address = self.serv.getsockname()
5306n/a file = open(support.TESTFN, 'rb')
5307n/a with socket.create_connection(address, timeout=2) as sock, file as file:
5308n/a count = 100007
5309n/a meth = self.meth_from_sock(sock)
5310n/a sent = meth(file, offset=2007, count=count)
5311n/a self.assertEqual(sent, count)
5312n/a self.assertEqual(file.tell(), count + 2007)
5313n/a
5314n/a def testCountWithOffset(self):
5315n/a count = 100007
5316n/a conn = self.accept_conn()
5317n/a data = self.recv_data(conn)
5318n/a self.assertEqual(len(data), count)
5319n/a self.assertEqual(data, self.FILEDATA[2007:count+2007])
5320n/a
5321n/a # non blocking sockets are not supposed to work
5322n/a
5323n/a def _testNonBlocking(self):
5324n/a address = self.serv.getsockname()
5325n/a file = open(support.TESTFN, 'rb')
5326n/a with socket.create_connection(address) as sock, file as file:
5327n/a sock.setblocking(False)
5328n/a meth = self.meth_from_sock(sock)
5329n/a self.assertRaises(ValueError, meth, file)
5330n/a self.assertRaises(ValueError, sock.sendfile, file)
5331n/a
5332n/a def testNonBlocking(self):
5333n/a conn = self.accept_conn()
5334n/a if conn.recv(8192):
5335n/a self.fail('was not supposed to receive any data')
5336n/a
5337n/a # timeout (non-triggered)
5338n/a
5339n/a def _testWithTimeout(self):
5340n/a address = self.serv.getsockname()
5341n/a file = open(support.TESTFN, 'rb')
5342n/a with socket.create_connection(address, timeout=2) as sock, file as file:
5343n/a meth = self.meth_from_sock(sock)
5344n/a sent = meth(file)
5345n/a self.assertEqual(sent, self.FILESIZE)
5346n/a
5347n/a def testWithTimeout(self):
5348n/a conn = self.accept_conn()
5349n/a data = self.recv_data(conn)
5350n/a self.assertEqual(len(data), self.FILESIZE)
5351n/a self.assertEqual(data, self.FILEDATA)
5352n/a
5353n/a # timeout (triggered)
5354n/a
5355n/a def _testWithTimeoutTriggeredSend(self):
5356n/a address = self.serv.getsockname()
5357n/a file = open(support.TESTFN, 'rb')
5358n/a with socket.create_connection(address, timeout=0.01) as sock, \
5359n/a file as file:
5360n/a meth = self.meth_from_sock(sock)
5361n/a self.assertRaises(socket.timeout, meth, file)
5362n/a
5363n/a def testWithTimeoutTriggeredSend(self):
5364n/a conn = self.accept_conn()
5365n/a conn.recv(88192)
5366n/a
5367n/a # errors
5368n/a
5369n/a def _test_errors(self):
5370n/a pass
5371n/a
5372n/a def test_errors(self):
5373n/a with open(support.TESTFN, 'rb') as file:
5374n/a with socket.socket(type=socket.SOCK_DGRAM) as s:
5375n/a meth = self.meth_from_sock(s)
5376n/a self.assertRaisesRegex(
5377n/a ValueError, "SOCK_STREAM", meth, file)
5378n/a with open(support.TESTFN, 'rt') as file:
5379n/a with socket.socket() as s:
5380n/a meth = self.meth_from_sock(s)
5381n/a self.assertRaisesRegex(
5382n/a ValueError, "binary mode", meth, file)
5383n/a with open(support.TESTFN, 'rb') as file:
5384n/a with socket.socket() as s:
5385n/a meth = self.meth_from_sock(s)
5386n/a self.assertRaisesRegex(TypeError, "positive integer",
5387n/a meth, file, count='2')
5388n/a self.assertRaisesRegex(TypeError, "positive integer",
5389n/a meth, file, count=0.1)
5390n/a self.assertRaisesRegex(ValueError, "positive integer",
5391n/a meth, file, count=0)
5392n/a self.assertRaisesRegex(ValueError, "positive integer",
5393n/a meth, file, count=-1)
5394n/a
5395n/a
5396n/a@unittest.skipUnless(thread, 'Threading required for this test.')
5397n/a@unittest.skipUnless(hasattr(os, "sendfile"),
5398n/a 'os.sendfile() required for this test.')
5399n/aclass SendfileUsingSendfileTest(SendfileUsingSendTest):
5400n/a """
5401n/a Test the sendfile() implementation of socket.sendfile().
5402n/a """
5403n/a def meth_from_sock(self, sock):
5404n/a return getattr(sock, "_sendfile_use_sendfile")
5405n/a
5406n/a
5407n/a@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
5408n/aclass LinuxKernelCryptoAPI(unittest.TestCase):
5409n/a # tests for AF_ALG
5410n/a def create_alg(self, typ, name):
5411n/a sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5412n/a try:
5413n/a sock.bind((typ, name))
5414n/a except FileNotFoundError as e:
5415n/a # type / algorithm is not available
5416n/a sock.close()
5417n/a raise unittest.SkipTest(str(e), typ, name)
5418n/a else:
5419n/a return sock
5420n/a
5421n/a def test_sha256(self):
5422n/a expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
5423n/a "177a9cb410ff61f20015ad")
5424n/a with self.create_alg('hash', 'sha256') as algo:
5425n/a op, _ = algo.accept()
5426n/a with op:
5427n/a op.sendall(b"abc")
5428n/a self.assertEqual(op.recv(512), expected)
5429n/a
5430n/a op, _ = algo.accept()
5431n/a with op:
5432n/a op.send(b'a', socket.MSG_MORE)
5433n/a op.send(b'b', socket.MSG_MORE)
5434n/a op.send(b'c', socket.MSG_MORE)
5435n/a op.send(b'')
5436n/a self.assertEqual(op.recv(512), expected)
5437n/a
5438n/a def test_hmac_sha1(self):
5439n/a expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
5440n/a with self.create_alg('hash', 'hmac(sha1)') as algo:
5441n/a algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
5442n/a op, _ = algo.accept()
5443n/a with op:
5444n/a op.sendall(b"what do ya want for nothing?")
5445n/a self.assertEqual(op.recv(512), expected)
5446n/a
5447n/a # Although it should work with 3.19 and newer the test blocks on
5448n/a # Ubuntu 15.10 with Kernel 4.2.0-19.
5449n/a @support.requires_linux_version(4, 3)
5450n/a def test_aes_cbc(self):
5451n/a key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
5452n/a iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
5453n/a msg = b"Single block msg"
5454n/a ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
5455n/a msglen = len(msg)
5456n/a with self.create_alg('skcipher', 'cbc(aes)') as algo:
5457n/a algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5458n/a op, _ = algo.accept()
5459n/a with op:
5460n/a op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5461n/a flags=socket.MSG_MORE)
5462n/a op.sendall(msg)
5463n/a self.assertEqual(op.recv(msglen), ciphertext)
5464n/a
5465n/a op, _ = algo.accept()
5466n/a with op:
5467n/a op.sendmsg_afalg([ciphertext],
5468n/a op=socket.ALG_OP_DECRYPT, iv=iv)
5469n/a self.assertEqual(op.recv(msglen), msg)
5470n/a
5471n/a # long message
5472n/a multiplier = 1024
5473n/a longmsg = [msg] * multiplier
5474n/a op, _ = algo.accept()
5475n/a with op:
5476n/a op.sendmsg_afalg(longmsg,
5477n/a op=socket.ALG_OP_ENCRYPT, iv=iv)
5478n/a enc = op.recv(msglen * multiplier)
5479n/a self.assertEqual(len(enc), msglen * multiplier)
5480n/a self.assertTrue(enc[:msglen], ciphertext)
5481n/a
5482n/a op, _ = algo.accept()
5483n/a with op:
5484n/a op.sendmsg_afalg([enc],
5485n/a op=socket.ALG_OP_DECRYPT, iv=iv)
5486n/a dec = op.recv(msglen * multiplier)
5487n/a self.assertEqual(len(dec), msglen * multiplier)
5488n/a self.assertEqual(dec, msg * multiplier)
5489n/a
5490n/a @support.requires_linux_version(4, 3) # see test_aes_cbc
5491n/a def test_aead_aes_gcm(self):
5492n/a key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
5493n/a iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
5494n/a plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
5495n/a assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
5496n/a expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
5497n/a expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
5498n/a
5499n/a taglen = len(expected_tag)
5500n/a assoclen = len(assoc)
5501n/a
5502n/a with self.create_alg('aead', 'gcm(aes)') as algo:
5503n/a algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
5504n/a algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
5505n/a None, taglen)
5506n/a
5507n/a # send assoc, plain and tag buffer in separate steps
5508n/a op, _ = algo.accept()
5509n/a with op:
5510n/a op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
5511n/a assoclen=assoclen, flags=socket.MSG_MORE)
5512n/a op.sendall(assoc, socket.MSG_MORE)
5513n/a op.sendall(plain, socket.MSG_MORE)
5514n/a op.sendall(b'\x00' * taglen)
5515n/a res = op.recv(assoclen + len(plain) + taglen)
5516n/a self.assertEqual(expected_ct, res[assoclen:-taglen])
5517n/a self.assertEqual(expected_tag, res[-taglen:])
5518n/a
5519n/a # now with msg
5520n/a op, _ = algo.accept()
5521n/a with op:
5522n/a msg = assoc + plain + b'\x00' * taglen
5523n/a op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
5524n/a assoclen=assoclen)
5525n/a res = op.recv(assoclen + len(plain) + taglen)
5526n/a self.assertEqual(expected_ct, res[assoclen:-taglen])
5527n/a self.assertEqual(expected_tag, res[-taglen:])
5528n/a
5529n/a # create anc data manually
5530n/a pack_uint32 = struct.Struct('I').pack
5531n/a op, _ = algo.accept()
5532n/a with op:
5533n/a msg = assoc + plain + b'\x00' * taglen
5534n/a op.sendmsg(
5535n/a [msg],
5536n/a ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
5537n/a [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
5538n/a [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
5539n/a )
5540n/a )
5541n/a res = op.recv(len(msg))
5542n/a self.assertEqual(expected_ct, res[assoclen:-taglen])
5543n/a self.assertEqual(expected_tag, res[-taglen:])
5544n/a
5545n/a # decrypt and verify
5546n/a op, _ = algo.accept()
5547n/a with op:
5548n/a msg = assoc + expected_ct + expected_tag
5549n/a op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
5550n/a assoclen=assoclen)
5551n/a res = op.recv(len(msg))
5552n/a self.assertEqual(plain, res[assoclen:-taglen])
5553n/a
5554n/a @support.requires_linux_version(4, 3) # see test_aes_cbc
5555n/a def test_drbg_pr_sha256(self):
5556n/a # deterministic random bit generator, prediction resistance, sha256
5557n/a with self.create_alg('rng', 'drbg_pr_sha256') as algo:
5558n/a extra_seed = os.urandom(32)
5559n/a algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
5560n/a op, _ = algo.accept()
5561n/a with op:
5562n/a rn = op.recv(32)
5563n/a self.assertEqual(len(rn), 32)
5564n/a
5565n/a def test_sendmsg_afalg_args(self):
5566n/a sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
5567n/a with sock:
5568n/a with self.assertRaises(TypeError):
5569n/a sock.sendmsg_afalg()
5570n/a
5571n/a with self.assertRaises(TypeError):
5572n/a sock.sendmsg_afalg(op=None)
5573n/a
5574n/a with self.assertRaises(TypeError):
5575n/a sock.sendmsg_afalg(1)
5576n/a
5577n/a with self.assertRaises(TypeError):
5578n/a sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
5579n/a
5580n/a with self.assertRaises(TypeError):
5581n/a sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
5582n/a
5583n/a
5584n/adef test_main():
5585n/a tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
5586n/a TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest, UDPTimeoutTest ]
5587n/a
5588n/a tests.extend([
5589n/a NonBlockingTCPTests,
5590n/a FileObjectClassTestCase,
5591n/a UnbufferedFileObjectClassTestCase,
5592n/a LineBufferedFileObjectClassTestCase,
5593n/a SmallBufferedFileObjectClassTestCase,
5594n/a UnicodeReadFileObjectClassTestCase,
5595n/a UnicodeWriteFileObjectClassTestCase,
5596n/a UnicodeReadWriteFileObjectClassTestCase,
5597n/a NetworkConnectionNoServer,
5598n/a NetworkConnectionAttributesTest,
5599n/a NetworkConnectionBehaviourTest,
5600n/a ContextManagersTest,
5601n/a InheritanceTest,
5602n/a NonblockConstantTest
5603n/a ])
5604n/a tests.append(BasicSocketPairTest)
5605n/a tests.append(TestUnixDomain)
5606n/a tests.append(TestLinuxAbstractNamespace)
5607n/a tests.extend([TIPCTest, TIPCThreadableTest])
5608n/a tests.extend([BasicCANTest, CANTest])
5609n/a tests.extend([BasicRDSTest, RDSTest])
5610n/a tests.append(LinuxKernelCryptoAPI)
5611n/a tests.extend([
5612n/a CmsgMacroTests,
5613n/a SendmsgUDPTest,
5614n/a RecvmsgUDPTest,
5615n/a RecvmsgIntoUDPTest,
5616n/a SendmsgUDP6Test,
5617n/a RecvmsgUDP6Test,
5618n/a RecvmsgRFC3542AncillaryUDP6Test,
5619n/a RecvmsgIntoRFC3542AncillaryUDP6Test,
5620n/a RecvmsgIntoUDP6Test,
5621n/a SendmsgTCPTest,
5622n/a RecvmsgTCPTest,
5623n/a RecvmsgIntoTCPTest,
5624n/a SendmsgSCTPStreamTest,
5625n/a RecvmsgSCTPStreamTest,
5626n/a RecvmsgIntoSCTPStreamTest,
5627n/a SendmsgUnixStreamTest,
5628n/a RecvmsgUnixStreamTest,
5629n/a RecvmsgIntoUnixStreamTest,
5630n/a RecvmsgSCMRightsStreamTest,
5631n/a RecvmsgIntoSCMRightsStreamTest,
5632n/a # These are slow when setitimer() is not available
5633n/a InterruptedRecvTimeoutTest,
5634n/a InterruptedSendTimeoutTest,
5635n/a TestSocketSharing,
5636n/a SendfileUsingSendTest,
5637n/a SendfileUsingSendfileTest,
5638n/a ])
5639n/a
5640n/a thread_info = support.threading_setup()
5641n/a support.run_unittest(*tests)
5642n/a support.threading_cleanup(*thread_info)
5643n/a
5644n/aif __name__ == "__main__":
5645n/a test_main()