ยปCore Development>Code coverage>Lib/test/test_asynchat.py

Python code coverage for Lib/test/test_asynchat.py

#countcontent
1n/a# test asynchat
2n/a
3n/afrom test import support
4n/a
5n/a# If this fails, the test will be skipped.
6n/athread = support.import_module('_thread')
7n/a
8n/aimport asynchat
9n/aimport asyncore
10n/aimport errno
11n/aimport socket
12n/aimport sys
13n/aimport time
14n/aimport unittest
15n/aimport unittest.mock
16n/atry:
17n/a import threading
18n/aexcept ImportError:
19n/a threading = None
20n/a
21n/aHOST = support.HOST
22n/aSERVER_QUIT = b'QUIT\n'
23n/aTIMEOUT = 3.0
24n/a
25n/aif threading:
26n/a class echo_server(threading.Thread):
27n/a # parameter to determine the number of bytes passed back to the
28n/a # client each send
29n/a chunk_size = 1
30n/a
31n/a def __init__(self, event):
32n/a threading.Thread.__init__(self)
33n/a self.event = event
34n/a self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35n/a self.port = support.bind_port(self.sock)
36n/a # This will be set if the client wants us to wait before echoing
37n/a # data back.
38n/a self.start_resend_event = None
39n/a
40n/a def run(self):
41n/a self.sock.listen()
42n/a self.event.set()
43n/a conn, client = self.sock.accept()
44n/a self.buffer = b""
45n/a # collect data until quit message is seen
46n/a while SERVER_QUIT not in self.buffer:
47n/a data = conn.recv(1)
48n/a if not data:
49n/a break
50n/a self.buffer = self.buffer + data
51n/a
52n/a # remove the SERVER_QUIT message
53n/a self.buffer = self.buffer.replace(SERVER_QUIT, b'')
54n/a
55n/a if self.start_resend_event:
56n/a self.start_resend_event.wait()
57n/a
58n/a # re-send entire set of collected data
59n/a try:
60n/a # this may fail on some tests, such as test_close_when_done,
61n/a # since the client closes the channel when it's done sending
62n/a while self.buffer:
63n/a n = conn.send(self.buffer[:self.chunk_size])
64n/a time.sleep(0.001)
65n/a self.buffer = self.buffer[n:]
66n/a except:
67n/a pass
68n/a
69n/a conn.close()
70n/a self.sock.close()
71n/a
72n/a class echo_client(asynchat.async_chat):
73n/a
74n/a def __init__(self, terminator, server_port):
75n/a asynchat.async_chat.__init__(self)
76n/a self.contents = []
77n/a self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
78n/a self.connect((HOST, server_port))
79n/a self.set_terminator(terminator)
80n/a self.buffer = b""
81n/a
82n/a def handle_connect(self):
83n/a pass
84n/a
85n/a if sys.platform == 'darwin':
86n/a # select.poll returns a select.POLLHUP at the end of the tests
87n/a # on darwin, so just ignore it
88n/a def handle_expt(self):
89n/a pass
90n/a
91n/a def collect_incoming_data(self, data):
92n/a self.buffer += data
93n/a
94n/a def found_terminator(self):
95n/a self.contents.append(self.buffer)
96n/a self.buffer = b""
97n/a
98n/a def start_echo_server():
99n/a event = threading.Event()
100n/a s = echo_server(event)
101n/a s.start()
102n/a event.wait()
103n/a event.clear()
104n/a time.sleep(0.01) # Give server time to start accepting.
105n/a return s, event
106n/a
107n/a
108n/a@unittest.skipUnless(threading, 'Threading required for this test.')
109n/aclass TestAsynchat(unittest.TestCase):
110n/a usepoll = False
111n/a
112n/a def setUp(self):
113n/a self._threads = support.threading_setup()
114n/a
115n/a def tearDown(self):
116n/a support.threading_cleanup(*self._threads)
117n/a
118n/a def line_terminator_check(self, term, server_chunk):
119n/a event = threading.Event()
120n/a s = echo_server(event)
121n/a s.chunk_size = server_chunk
122n/a s.start()
123n/a event.wait()
124n/a event.clear()
125n/a time.sleep(0.01) # Give server time to start accepting.
126n/a c = echo_client(term, s.port)
127n/a c.push(b"hello ")
128n/a c.push(b"world" + term)
129n/a c.push(b"I'm not dead yet!" + term)
130n/a c.push(SERVER_QUIT)
131n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
132n/a s.join(timeout=TIMEOUT)
133n/a if s.is_alive():
134n/a self.fail("join() timed out")
135n/a
136n/a self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
137n/a
138n/a # the line terminator tests below check receiving variously-sized
139n/a # chunks back from the server in order to exercise all branches of
140n/a # async_chat.handle_read
141n/a
142n/a def test_line_terminator1(self):
143n/a # test one-character terminator
144n/a for l in (1, 2, 3):
145n/a self.line_terminator_check(b'\n', l)
146n/a
147n/a def test_line_terminator2(self):
148n/a # test two-character terminator
149n/a for l in (1, 2, 3):
150n/a self.line_terminator_check(b'\r\n', l)
151n/a
152n/a def test_line_terminator3(self):
153n/a # test three-character terminator
154n/a for l in (1, 2, 3):
155n/a self.line_terminator_check(b'qqq', l)
156n/a
157n/a def numeric_terminator_check(self, termlen):
158n/a # Try reading a fixed number of bytes
159n/a s, event = start_echo_server()
160n/a c = echo_client(termlen, s.port)
161n/a data = b"hello world, I'm not dead yet!\n"
162n/a c.push(data)
163n/a c.push(SERVER_QUIT)
164n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
165n/a s.join(timeout=TIMEOUT)
166n/a if s.is_alive():
167n/a self.fail("join() timed out")
168n/a
169n/a self.assertEqual(c.contents, [data[:termlen]])
170n/a
171n/a def test_numeric_terminator1(self):
172n/a # check that ints & longs both work (since type is
173n/a # explicitly checked in async_chat.handle_read)
174n/a self.numeric_terminator_check(1)
175n/a
176n/a def test_numeric_terminator2(self):
177n/a self.numeric_terminator_check(6)
178n/a
179n/a def test_none_terminator(self):
180n/a # Try reading a fixed number of bytes
181n/a s, event = start_echo_server()
182n/a c = echo_client(None, s.port)
183n/a data = b"hello world, I'm not dead yet!\n"
184n/a c.push(data)
185n/a c.push(SERVER_QUIT)
186n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
187n/a s.join(timeout=TIMEOUT)
188n/a if s.is_alive():
189n/a self.fail("join() timed out")
190n/a
191n/a self.assertEqual(c.contents, [])
192n/a self.assertEqual(c.buffer, data)
193n/a
194n/a def test_simple_producer(self):
195n/a s, event = start_echo_server()
196n/a c = echo_client(b'\n', s.port)
197n/a data = b"hello world\nI'm not dead yet!\n"
198n/a p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
199n/a c.push_with_producer(p)
200n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
201n/a s.join(timeout=TIMEOUT)
202n/a if s.is_alive():
203n/a self.fail("join() timed out")
204n/a
205n/a self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
206n/a
207n/a def test_string_producer(self):
208n/a s, event = start_echo_server()
209n/a c = echo_client(b'\n', s.port)
210n/a data = b"hello world\nI'm not dead yet!\n"
211n/a c.push_with_producer(data+SERVER_QUIT)
212n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
213n/a s.join(timeout=TIMEOUT)
214n/a if s.is_alive():
215n/a self.fail("join() timed out")
216n/a
217n/a self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
218n/a
219n/a def test_empty_line(self):
220n/a # checks that empty lines are handled correctly
221n/a s, event = start_echo_server()
222n/a c = echo_client(b'\n', s.port)
223n/a c.push(b"hello world\n\nI'm not dead yet!\n")
224n/a c.push(SERVER_QUIT)
225n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
226n/a s.join(timeout=TIMEOUT)
227n/a if s.is_alive():
228n/a self.fail("join() timed out")
229n/a
230n/a self.assertEqual(c.contents,
231n/a [b"hello world", b"", b"I'm not dead yet!"])
232n/a
233n/a def test_close_when_done(self):
234n/a s, event = start_echo_server()
235n/a s.start_resend_event = threading.Event()
236n/a c = echo_client(b'\n', s.port)
237n/a c.push(b"hello world\nI'm not dead yet!\n")
238n/a c.push(SERVER_QUIT)
239n/a c.close_when_done()
240n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
241n/a
242n/a # Only allow the server to start echoing data back to the client after
243n/a # the client has closed its connection. This prevents a race condition
244n/a # where the server echoes all of its data before we can check that it
245n/a # got any down below.
246n/a s.start_resend_event.set()
247n/a s.join(timeout=TIMEOUT)
248n/a if s.is_alive():
249n/a self.fail("join() timed out")
250n/a
251n/a self.assertEqual(c.contents, [])
252n/a # the server might have been able to send a byte or two back, but this
253n/a # at least checks that it received something and didn't just fail
254n/a # (which could still result in the client not having received anything)
255n/a self.assertGreater(len(s.buffer), 0)
256n/a
257n/a def test_push(self):
258n/a # Issue #12523: push() should raise a TypeError if it doesn't get
259n/a # a bytes string
260n/a s, event = start_echo_server()
261n/a c = echo_client(b'\n', s.port)
262n/a data = b'bytes\n'
263n/a c.push(data)
264n/a c.push(bytearray(data))
265n/a c.push(memoryview(data))
266n/a self.assertRaises(TypeError, c.push, 10)
267n/a self.assertRaises(TypeError, c.push, 'unicode')
268n/a c.push(SERVER_QUIT)
269n/a asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
270n/a s.join(timeout=TIMEOUT)
271n/a self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
272n/a
273n/a
274n/aclass TestAsynchat_WithPoll(TestAsynchat):
275n/a usepoll = True
276n/a
277n/a
278n/aclass TestAsynchatMocked(unittest.TestCase):
279n/a def test_blockingioerror(self):
280n/a # Issue #16133: handle_read() must ignore BlockingIOError
281n/a sock = unittest.mock.Mock()
282n/a sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
283n/a
284n/a dispatcher = asynchat.async_chat()
285n/a dispatcher.set_socket(sock)
286n/a self.addCleanup(dispatcher.del_channel)
287n/a
288n/a with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
289n/a dispatcher.handle_read()
290n/a self.assertFalse(error.called)
291n/a
292n/a
293n/aclass TestHelperFunctions(unittest.TestCase):
294n/a def test_find_prefix_at_end(self):
295n/a self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
296n/a self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
297n/a
298n/a
299n/aclass TestNotConnected(unittest.TestCase):
300n/a def test_disallow_negative_terminator(self):
301n/a # Issue #11259
302n/a client = asynchat.async_chat()
303n/a self.assertRaises(ValueError, client.set_terminator, -1)
304n/a
305n/a
306n/a
307n/aif __name__ == "__main__":
308n/a unittest.main()