ยปCore Development>Code coverage>Lib/test/test_asyncore.py

Python code coverage for Lib/test/test_asyncore.py

#countcontent
1n/aimport asyncore
2n/aimport unittest
3n/aimport select
4n/aimport os
5n/aimport socket
6n/aimport sys
7n/aimport time
8n/aimport errno
9n/aimport struct
10n/a
11n/afrom test import support
12n/afrom io import BytesIO
13n/a
14n/aif support.PGO:
15n/a raise unittest.SkipTest("test is not helpful for PGO")
16n/a
17n/atry:
18n/a import threading
19n/aexcept ImportError:
20n/a threading = None
21n/a
22n/aTIMEOUT = 3
23n/aHAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX')
24n/a
25n/aclass dummysocket:
26n/a def __init__(self):
27n/a self.closed = False
28n/a
29n/a def close(self):
30n/a self.closed = True
31n/a
32n/a def fileno(self):
33n/a return 42
34n/a
35n/aclass dummychannel:
36n/a def __init__(self):
37n/a self.socket = dummysocket()
38n/a
39n/a def close(self):
40n/a self.socket.close()
41n/a
42n/aclass exitingdummy:
43n/a def __init__(self):
44n/a pass
45n/a
46n/a def handle_read_event(self):
47n/a raise asyncore.ExitNow()
48n/a
49n/a handle_write_event = handle_read_event
50n/a handle_close = handle_read_event
51n/a handle_expt_event = handle_read_event
52n/a
53n/aclass crashingdummy:
54n/a def __init__(self):
55n/a self.error_handled = False
56n/a
57n/a def handle_read_event(self):
58n/a raise Exception()
59n/a
60n/a handle_write_event = handle_read_event
61n/a handle_close = handle_read_event
62n/a handle_expt_event = handle_read_event
63n/a
64n/a def handle_error(self):
65n/a self.error_handled = True
66n/a
67n/a# used when testing senders; just collects what it gets until newline is sent
68n/adef capture_server(evt, buf, serv):
69n/a try:
70n/a serv.listen()
71n/a conn, addr = serv.accept()
72n/a except socket.timeout:
73n/a pass
74n/a else:
75n/a n = 200
76n/a start = time.time()
77n/a while n > 0 and time.time() - start < 3.0:
78n/a r, w, e = select.select([conn], [], [], 0.1)
79n/a if r:
80n/a n -= 1
81n/a data = conn.recv(10)
82n/a # keep everything except for the newline terminator
83n/a buf.write(data.replace(b'\n', b''))
84n/a if b'\n' in data:
85n/a break
86n/a time.sleep(0.01)
87n/a
88n/a conn.close()
89n/a finally:
90n/a serv.close()
91n/a evt.set()
92n/a
93n/adef bind_af_aware(sock, addr):
94n/a """Helper function to bind a socket according to its family."""
95n/a if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
96n/a # Make sure the path doesn't exist.
97n/a support.unlink(addr)
98n/a support.bind_unix_socket(sock, addr)
99n/a else:
100n/a sock.bind(addr)
101n/a
102n/a
103n/aclass HelperFunctionTests(unittest.TestCase):
104n/a def test_readwriteexc(self):
105n/a # Check exception handling behavior of read, write and _exception
106n/a
107n/a # check that ExitNow exceptions in the object handler method
108n/a # bubbles all the way up through asyncore read/write/_exception calls
109n/a tr1 = exitingdummy()
110n/a self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
111n/a self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
112n/a self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
113n/a
114n/a # check that an exception other than ExitNow in the object handler
115n/a # method causes the handle_error method to get called
116n/a tr2 = crashingdummy()
117n/a asyncore.read(tr2)
118n/a self.assertEqual(tr2.error_handled, True)
119n/a
120n/a tr2 = crashingdummy()
121n/a asyncore.write(tr2)
122n/a self.assertEqual(tr2.error_handled, True)
123n/a
124n/a tr2 = crashingdummy()
125n/a asyncore._exception(tr2)
126n/a self.assertEqual(tr2.error_handled, True)
127n/a
128n/a # asyncore.readwrite uses constants in the select module that
129n/a # are not present in Windows systems (see this thread:
130n/a # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
131n/a # These constants should be present as long as poll is available
132n/a
133n/a @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
134n/a def test_readwrite(self):
135n/a # Check that correct methods are called by readwrite()
136n/a
137n/a attributes = ('read', 'expt', 'write', 'closed', 'error_handled')
138n/a
139n/a expected = (
140n/a (select.POLLIN, 'read'),
141n/a (select.POLLPRI, 'expt'),
142n/a (select.POLLOUT, 'write'),
143n/a (select.POLLERR, 'closed'),
144n/a (select.POLLHUP, 'closed'),
145n/a (select.POLLNVAL, 'closed'),
146n/a )
147n/a
148n/a class testobj:
149n/a def __init__(self):
150n/a self.read = False
151n/a self.write = False
152n/a self.closed = False
153n/a self.expt = False
154n/a self.error_handled = False
155n/a
156n/a def handle_read_event(self):
157n/a self.read = True
158n/a
159n/a def handle_write_event(self):
160n/a self.write = True
161n/a
162n/a def handle_close(self):
163n/a self.closed = True
164n/a
165n/a def handle_expt_event(self):
166n/a self.expt = True
167n/a
168n/a def handle_error(self):
169n/a self.error_handled = True
170n/a
171n/a for flag, expectedattr in expected:
172n/a tobj = testobj()
173n/a self.assertEqual(getattr(tobj, expectedattr), False)
174n/a asyncore.readwrite(tobj, flag)
175n/a
176n/a # Only the attribute modified by the routine we expect to be
177n/a # called should be True.
178n/a for attr in attributes:
179n/a self.assertEqual(getattr(tobj, attr), attr==expectedattr)
180n/a
181n/a # check that ExitNow exceptions in the object handler method
182n/a # bubbles all the way up through asyncore readwrite call
183n/a tr1 = exitingdummy()
184n/a self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
185n/a
186n/a # check that an exception other than ExitNow in the object handler
187n/a # method causes the handle_error method to get called
188n/a tr2 = crashingdummy()
189n/a self.assertEqual(tr2.error_handled, False)
190n/a asyncore.readwrite(tr2, flag)
191n/a self.assertEqual(tr2.error_handled, True)
192n/a
193n/a def test_closeall(self):
194n/a self.closeall_check(False)
195n/a
196n/a def test_closeall_default(self):
197n/a self.closeall_check(True)
198n/a
199n/a def closeall_check(self, usedefault):
200n/a # Check that close_all() closes everything in a given map
201n/a
202n/a l = []
203n/a testmap = {}
204n/a for i in range(10):
205n/a c = dummychannel()
206n/a l.append(c)
207n/a self.assertEqual(c.socket.closed, False)
208n/a testmap[i] = c
209n/a
210n/a if usedefault:
211n/a socketmap = asyncore.socket_map
212n/a try:
213n/a asyncore.socket_map = testmap
214n/a asyncore.close_all()
215n/a finally:
216n/a testmap, asyncore.socket_map = asyncore.socket_map, socketmap
217n/a else:
218n/a asyncore.close_all(testmap)
219n/a
220n/a self.assertEqual(len(testmap), 0)
221n/a
222n/a for c in l:
223n/a self.assertEqual(c.socket.closed, True)
224n/a
225n/a def test_compact_traceback(self):
226n/a try:
227n/a raise Exception("I don't like spam!")
228n/a except:
229n/a real_t, real_v, real_tb = sys.exc_info()
230n/a r = asyncore.compact_traceback()
231n/a else:
232n/a self.fail("Expected exception")
233n/a
234n/a (f, function, line), t, v, info = r
235n/a self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
236n/a self.assertEqual(function, 'test_compact_traceback')
237n/a self.assertEqual(t, real_t)
238n/a self.assertEqual(v, real_v)
239n/a self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
240n/a
241n/a
242n/aclass DispatcherTests(unittest.TestCase):
243n/a def setUp(self):
244n/a pass
245n/a
246n/a def tearDown(self):
247n/a asyncore.close_all()
248n/a
249n/a def test_basic(self):
250n/a d = asyncore.dispatcher()
251n/a self.assertEqual(d.readable(), True)
252n/a self.assertEqual(d.writable(), True)
253n/a
254n/a def test_repr(self):
255n/a d = asyncore.dispatcher()
256n/a self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
257n/a
258n/a def test_log(self):
259n/a d = asyncore.dispatcher()
260n/a
261n/a # capture output of dispatcher.log() (to stderr)
262n/a l1 = "Lovely spam! Wonderful spam!"
263n/a l2 = "I don't like spam!"
264n/a with support.captured_stderr() as stderr:
265n/a d.log(l1)
266n/a d.log(l2)
267n/a
268n/a lines = stderr.getvalue().splitlines()
269n/a self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
270n/a
271n/a def test_log_info(self):
272n/a d = asyncore.dispatcher()
273n/a
274n/a # capture output of dispatcher.log_info() (to stdout via print)
275n/a l1 = "Have you got anything without spam?"
276n/a l2 = "Why can't she have egg bacon spam and sausage?"
277n/a l3 = "THAT'S got spam in it!"
278n/a with support.captured_stdout() as stdout:
279n/a d.log_info(l1, 'EGGS')
280n/a d.log_info(l2)
281n/a d.log_info(l3, 'SPAM')
282n/a
283n/a lines = stdout.getvalue().splitlines()
284n/a expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
285n/a self.assertEqual(lines, expected)
286n/a
287n/a def test_unhandled(self):
288n/a d = asyncore.dispatcher()
289n/a d.ignore_log_types = ()
290n/a
291n/a # capture output of dispatcher.log_info() (to stdout via print)
292n/a with support.captured_stdout() as stdout:
293n/a d.handle_expt()
294n/a d.handle_read()
295n/a d.handle_write()
296n/a d.handle_connect()
297n/a
298n/a lines = stdout.getvalue().splitlines()
299n/a expected = ['warning: unhandled incoming priority event',
300n/a 'warning: unhandled read event',
301n/a 'warning: unhandled write event',
302n/a 'warning: unhandled connect event']
303n/a self.assertEqual(lines, expected)
304n/a
305n/a def test_strerror(self):
306n/a # refers to bug #8573
307n/a err = asyncore._strerror(errno.EPERM)
308n/a if hasattr(os, 'strerror'):
309n/a self.assertEqual(err, os.strerror(errno.EPERM))
310n/a err = asyncore._strerror(-1)
311n/a self.assertTrue(err != "")
312n/a
313n/a
314n/aclass dispatcherwithsend_noread(asyncore.dispatcher_with_send):
315n/a def readable(self):
316n/a return False
317n/a
318n/a def handle_connect(self):
319n/a pass
320n/a
321n/a
322n/aclass DispatcherWithSendTests(unittest.TestCase):
323n/a def setUp(self):
324n/a pass
325n/a
326n/a def tearDown(self):
327n/a asyncore.close_all()
328n/a
329n/a @unittest.skipUnless(threading, 'Threading required for this test.')
330n/a @support.reap_threads
331n/a def test_send(self):
332n/a evt = threading.Event()
333n/a sock = socket.socket()
334n/a sock.settimeout(3)
335n/a port = support.bind_port(sock)
336n/a
337n/a cap = BytesIO()
338n/a args = (evt, cap, sock)
339n/a t = threading.Thread(target=capture_server, args=args)
340n/a t.start()
341n/a try:
342n/a # wait a little longer for the server to initialize (it sometimes
343n/a # refuses connections on slow machines without this wait)
344n/a time.sleep(0.2)
345n/a
346n/a data = b"Suppose there isn't a 16-ton weight?"
347n/a d = dispatcherwithsend_noread()
348n/a d.create_socket()
349n/a d.connect((support.HOST, port))
350n/a
351n/a # give time for socket to connect
352n/a time.sleep(0.1)
353n/a
354n/a d.send(data)
355n/a d.send(data)
356n/a d.send(b'\n')
357n/a
358n/a n = 1000
359n/a while d.out_buffer and n > 0:
360n/a asyncore.poll()
361n/a n -= 1
362n/a
363n/a evt.wait()
364n/a
365n/a self.assertEqual(cap.getvalue(), data*2)
366n/a finally:
367n/a t.join(timeout=TIMEOUT)
368n/a if t.is_alive():
369n/a self.fail("join() timed out")
370n/a
371n/a
372n/a@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'),
373n/a 'asyncore.file_wrapper required')
374n/aclass FileWrapperTest(unittest.TestCase):
375n/a def setUp(self):
376n/a self.d = b"It's not dead, it's sleeping!"
377n/a with open(support.TESTFN, 'wb') as file:
378n/a file.write(self.d)
379n/a
380n/a def tearDown(self):
381n/a support.unlink(support.TESTFN)
382n/a
383n/a def test_recv(self):
384n/a fd = os.open(support.TESTFN, os.O_RDONLY)
385n/a w = asyncore.file_wrapper(fd)
386n/a os.close(fd)
387n/a
388n/a self.assertNotEqual(w.fd, fd)
389n/a self.assertNotEqual(w.fileno(), fd)
390n/a self.assertEqual(w.recv(13), b"It's not dead")
391n/a self.assertEqual(w.read(6), b", it's")
392n/a w.close()
393n/a self.assertRaises(OSError, w.read, 1)
394n/a
395n/a def test_send(self):
396n/a d1 = b"Come again?"
397n/a d2 = b"I want to buy some cheese."
398n/a fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
399n/a w = asyncore.file_wrapper(fd)
400n/a os.close(fd)
401n/a
402n/a w.write(d1)
403n/a w.send(d2)
404n/a w.close()
405n/a with open(support.TESTFN, 'rb') as file:
406n/a self.assertEqual(file.read(), self.d + d1 + d2)
407n/a
408n/a @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
409n/a 'asyncore.file_dispatcher required')
410n/a def test_dispatcher(self):
411n/a fd = os.open(support.TESTFN, os.O_RDONLY)
412n/a data = []
413n/a class FileDispatcher(asyncore.file_dispatcher):
414n/a def handle_read(self):
415n/a data.append(self.recv(29))
416n/a s = FileDispatcher(fd)
417n/a os.close(fd)
418n/a asyncore.loop(timeout=0.01, use_poll=True, count=2)
419n/a self.assertEqual(b"".join(data), self.d)
420n/a
421n/a def test_resource_warning(self):
422n/a # Issue #11453
423n/a fd = os.open(support.TESTFN, os.O_RDONLY)
424n/a f = asyncore.file_wrapper(fd)
425n/a
426n/a os.close(fd)
427n/a with support.check_warnings(('', ResourceWarning)):
428n/a f = None
429n/a support.gc_collect()
430n/a
431n/a def test_close_twice(self):
432n/a fd = os.open(support.TESTFN, os.O_RDONLY)
433n/a f = asyncore.file_wrapper(fd)
434n/a os.close(fd)
435n/a
436n/a f.close()
437n/a self.assertEqual(f.fd, -1)
438n/a # calling close twice should not fail
439n/a f.close()
440n/a
441n/a
442n/aclass BaseTestHandler(asyncore.dispatcher):
443n/a
444n/a def __init__(self, sock=None):
445n/a asyncore.dispatcher.__init__(self, sock)
446n/a self.flag = False
447n/a
448n/a def handle_accept(self):
449n/a raise Exception("handle_accept not supposed to be called")
450n/a
451n/a def handle_accepted(self):
452n/a raise Exception("handle_accepted not supposed to be called")
453n/a
454n/a def handle_connect(self):
455n/a raise Exception("handle_connect not supposed to be called")
456n/a
457n/a def handle_expt(self):
458n/a raise Exception("handle_expt not supposed to be called")
459n/a
460n/a def handle_close(self):
461n/a raise Exception("handle_close not supposed to be called")
462n/a
463n/a def handle_error(self):
464n/a raise
465n/a
466n/a
467n/aclass BaseServer(asyncore.dispatcher):
468n/a """A server which listens on an address and dispatches the
469n/a connection to a handler.
470n/a """
471n/a
472n/a def __init__(self, family, addr, handler=BaseTestHandler):
473n/a asyncore.dispatcher.__init__(self)
474n/a self.create_socket(family)
475n/a self.set_reuse_addr()
476n/a bind_af_aware(self.socket, addr)
477n/a self.listen(5)
478n/a self.handler = handler
479n/a
480n/a @property
481n/a def address(self):
482n/a return self.socket.getsockname()
483n/a
484n/a def handle_accepted(self, sock, addr):
485n/a self.handler(sock)
486n/a
487n/a def handle_error(self):
488n/a raise
489n/a
490n/a
491n/aclass BaseClient(BaseTestHandler):
492n/a
493n/a def __init__(self, family, address):
494n/a BaseTestHandler.__init__(self)
495n/a self.create_socket(family)
496n/a self.connect(address)
497n/a
498n/a def handle_connect(self):
499n/a pass
500n/a
501n/a
502n/aclass BaseTestAPI:
503n/a
504n/a def tearDown(self):
505n/a asyncore.close_all()
506n/a
507n/a def loop_waiting_for_flag(self, instance, timeout=5):
508n/a timeout = float(timeout) / 100
509n/a count = 100
510n/a while asyncore.socket_map and count > 0:
511n/a asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
512n/a if instance.flag:
513n/a return
514n/a count -= 1
515n/a time.sleep(timeout)
516n/a self.fail("flag not set")
517n/a
518n/a def test_handle_connect(self):
519n/a # make sure handle_connect is called on connect()
520n/a
521n/a class TestClient(BaseClient):
522n/a def handle_connect(self):
523n/a self.flag = True
524n/a
525n/a server = BaseServer(self.family, self.addr)
526n/a client = TestClient(self.family, server.address)
527n/a self.loop_waiting_for_flag(client)
528n/a
529n/a def test_handle_accept(self):
530n/a # make sure handle_accept() is called when a client connects
531n/a
532n/a class TestListener(BaseTestHandler):
533n/a
534n/a def __init__(self, family, addr):
535n/a BaseTestHandler.__init__(self)
536n/a self.create_socket(family)
537n/a bind_af_aware(self.socket, addr)
538n/a self.listen(5)
539n/a self.address = self.socket.getsockname()
540n/a
541n/a def handle_accept(self):
542n/a self.flag = True
543n/a
544n/a server = TestListener(self.family, self.addr)
545n/a client = BaseClient(self.family, server.address)
546n/a self.loop_waiting_for_flag(server)
547n/a
548n/a def test_handle_accepted(self):
549n/a # make sure handle_accepted() is called when a client connects
550n/a
551n/a class TestListener(BaseTestHandler):
552n/a
553n/a def __init__(self, family, addr):
554n/a BaseTestHandler.__init__(self)
555n/a self.create_socket(family)
556n/a bind_af_aware(self.socket, addr)
557n/a self.listen(5)
558n/a self.address = self.socket.getsockname()
559n/a
560n/a def handle_accept(self):
561n/a asyncore.dispatcher.handle_accept(self)
562n/a
563n/a def handle_accepted(self, sock, addr):
564n/a sock.close()
565n/a self.flag = True
566n/a
567n/a server = TestListener(self.family, self.addr)
568n/a client = BaseClient(self.family, server.address)
569n/a self.loop_waiting_for_flag(server)
570n/a
571n/a
572n/a def test_handle_read(self):
573n/a # make sure handle_read is called on data received
574n/a
575n/a class TestClient(BaseClient):
576n/a def handle_read(self):
577n/a self.flag = True
578n/a
579n/a class TestHandler(BaseTestHandler):
580n/a def __init__(self, conn):
581n/a BaseTestHandler.__init__(self, conn)
582n/a self.send(b'x' * 1024)
583n/a
584n/a server = BaseServer(self.family, self.addr, TestHandler)
585n/a client = TestClient(self.family, server.address)
586n/a self.loop_waiting_for_flag(client)
587n/a
588n/a def test_handle_write(self):
589n/a # make sure handle_write is called
590n/a
591n/a class TestClient(BaseClient):
592n/a def handle_write(self):
593n/a self.flag = True
594n/a
595n/a server = BaseServer(self.family, self.addr)
596n/a client = TestClient(self.family, server.address)
597n/a self.loop_waiting_for_flag(client)
598n/a
599n/a def test_handle_close(self):
600n/a # make sure handle_close is called when the other end closes
601n/a # the connection
602n/a
603n/a class TestClient(BaseClient):
604n/a
605n/a def handle_read(self):
606n/a # in order to make handle_close be called we are supposed
607n/a # to make at least one recv() call
608n/a self.recv(1024)
609n/a
610n/a def handle_close(self):
611n/a self.flag = True
612n/a self.close()
613n/a
614n/a class TestHandler(BaseTestHandler):
615n/a def __init__(self, conn):
616n/a BaseTestHandler.__init__(self, conn)
617n/a self.close()
618n/a
619n/a server = BaseServer(self.family, self.addr, TestHandler)
620n/a client = TestClient(self.family, server.address)
621n/a self.loop_waiting_for_flag(client)
622n/a
623n/a def test_handle_close_after_conn_broken(self):
624n/a # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
625n/a # #11265).
626n/a
627n/a data = b'\0' * 128
628n/a
629n/a class TestClient(BaseClient):
630n/a
631n/a def handle_write(self):
632n/a self.send(data)
633n/a
634n/a def handle_close(self):
635n/a self.flag = True
636n/a self.close()
637n/a
638n/a def handle_expt(self):
639n/a self.flag = True
640n/a self.close()
641n/a
642n/a class TestHandler(BaseTestHandler):
643n/a
644n/a def handle_read(self):
645n/a self.recv(len(data))
646n/a self.close()
647n/a
648n/a def writable(self):
649n/a return False
650n/a
651n/a server = BaseServer(self.family, self.addr, TestHandler)
652n/a client = TestClient(self.family, server.address)
653n/a self.loop_waiting_for_flag(client)
654n/a
655n/a @unittest.skipIf(sys.platform.startswith("sunos"),
656n/a "OOB support is broken on Solaris")
657n/a def test_handle_expt(self):
658n/a # Make sure handle_expt is called on OOB data received.
659n/a # Note: this might fail on some platforms as OOB data is
660n/a # tenuously supported and rarely used.
661n/a if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
662n/a self.skipTest("Not applicable to AF_UNIX sockets.")
663n/a
664n/a class TestClient(BaseClient):
665n/a def handle_expt(self):
666n/a self.socket.recv(1024, socket.MSG_OOB)
667n/a self.flag = True
668n/a
669n/a class TestHandler(BaseTestHandler):
670n/a def __init__(self, conn):
671n/a BaseTestHandler.__init__(self, conn)
672n/a self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
673n/a
674n/a server = BaseServer(self.family, self.addr, TestHandler)
675n/a client = TestClient(self.family, server.address)
676n/a self.loop_waiting_for_flag(client)
677n/a
678n/a def test_handle_error(self):
679n/a
680n/a class TestClient(BaseClient):
681n/a def handle_write(self):
682n/a 1.0 / 0
683n/a def handle_error(self):
684n/a self.flag = True
685n/a try:
686n/a raise
687n/a except ZeroDivisionError:
688n/a pass
689n/a else:
690n/a raise Exception("exception not raised")
691n/a
692n/a server = BaseServer(self.family, self.addr)
693n/a client = TestClient(self.family, server.address)
694n/a self.loop_waiting_for_flag(client)
695n/a
696n/a def test_connection_attributes(self):
697n/a server = BaseServer(self.family, self.addr)
698n/a client = BaseClient(self.family, server.address)
699n/a
700n/a # we start disconnected
701n/a self.assertFalse(server.connected)
702n/a self.assertTrue(server.accepting)
703n/a # this can't be taken for granted across all platforms
704n/a #self.assertFalse(client.connected)
705n/a self.assertFalse(client.accepting)
706n/a
707n/a # execute some loops so that client connects to server
708n/a asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
709n/a self.assertFalse(server.connected)
710n/a self.assertTrue(server.accepting)
711n/a self.assertTrue(client.connected)
712n/a self.assertFalse(client.accepting)
713n/a
714n/a # disconnect the client
715n/a client.close()
716n/a self.assertFalse(server.connected)
717n/a self.assertTrue(server.accepting)
718n/a self.assertFalse(client.connected)
719n/a self.assertFalse(client.accepting)
720n/a
721n/a # stop serving
722n/a server.close()
723n/a self.assertFalse(server.connected)
724n/a self.assertFalse(server.accepting)
725n/a
726n/a def test_create_socket(self):
727n/a s = asyncore.dispatcher()
728n/a s.create_socket(self.family)
729n/a self.assertEqual(s.socket.family, self.family)
730n/a SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
731n/a sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK
732n/a if hasattr(socket, 'SOCK_CLOEXEC'):
733n/a self.assertIn(s.socket.type,
734n/a (sock_type | socket.SOCK_CLOEXEC, sock_type))
735n/a else:
736n/a self.assertEqual(s.socket.type, sock_type)
737n/a
738n/a def test_bind(self):
739n/a if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
740n/a self.skipTest("Not applicable to AF_UNIX sockets.")
741n/a s1 = asyncore.dispatcher()
742n/a s1.create_socket(self.family)
743n/a s1.bind(self.addr)
744n/a s1.listen(5)
745n/a port = s1.socket.getsockname()[1]
746n/a
747n/a s2 = asyncore.dispatcher()
748n/a s2.create_socket(self.family)
749n/a # EADDRINUSE indicates the socket was correctly bound
750n/a self.assertRaises(OSError, s2.bind, (self.addr[0], port))
751n/a
752n/a def test_set_reuse_addr(self):
753n/a if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
754n/a self.skipTest("Not applicable to AF_UNIX sockets.")
755n/a sock = socket.socket(self.family)
756n/a try:
757n/a sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
758n/a except OSError:
759n/a unittest.skip("SO_REUSEADDR not supported on this platform")
760n/a else:
761n/a # if SO_REUSEADDR succeeded for sock we expect asyncore
762n/a # to do the same
763n/a s = asyncore.dispatcher(socket.socket(self.family))
764n/a self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
765n/a socket.SO_REUSEADDR))
766n/a s.socket.close()
767n/a s.create_socket(self.family)
768n/a s.set_reuse_addr()
769n/a self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
770n/a socket.SO_REUSEADDR))
771n/a finally:
772n/a sock.close()
773n/a
774n/a @unittest.skipUnless(threading, 'Threading required for this test.')
775n/a @support.reap_threads
776n/a def test_quick_connect(self):
777n/a # see: http://bugs.python.org/issue10340
778n/a if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
779n/a server = BaseServer(self.family, self.addr)
780n/a t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
781n/a count=500))
782n/a t.start()
783n/a def cleanup():
784n/a t.join(timeout=TIMEOUT)
785n/a if t.is_alive():
786n/a self.fail("join() timed out")
787n/a self.addCleanup(cleanup)
788n/a
789n/a s = socket.socket(self.family, socket.SOCK_STREAM)
790n/a s.settimeout(.2)
791n/a s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
792n/a struct.pack('ii', 1, 0))
793n/a try:
794n/a s.connect(server.address)
795n/a except OSError:
796n/a pass
797n/a finally:
798n/a s.close()
799n/a
800n/aclass TestAPI_UseIPv4Sockets(BaseTestAPI):
801n/a family = socket.AF_INET
802n/a addr = (support.HOST, 0)
803n/a
804n/a@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required')
805n/aclass TestAPI_UseIPv6Sockets(BaseTestAPI):
806n/a family = socket.AF_INET6
807n/a addr = (support.HOSTv6, 0)
808n/a
809n/a@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
810n/aclass TestAPI_UseUnixSockets(BaseTestAPI):
811n/a if HAS_UNIX_SOCKETS:
812n/a family = socket.AF_UNIX
813n/a addr = support.TESTFN
814n/a
815n/a def tearDown(self):
816n/a support.unlink(self.addr)
817n/a BaseTestAPI.tearDown(self)
818n/a
819n/aclass TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):
820n/a use_poll = False
821n/a
822n/a@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
823n/aclass TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase):
824n/a use_poll = True
825n/a
826n/aclass TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase):
827n/a use_poll = False
828n/a
829n/a@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
830n/aclass TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase):
831n/a use_poll = True
832n/a
833n/aclass TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase):
834n/a use_poll = False
835n/a
836n/a@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required')
837n/aclass TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase):
838n/a use_poll = True
839n/a
840n/aif __name__ == "__main__":
841n/a unittest.main()