ยปCore Development>Code coverage>Lib/asyncio/streams.py

Python code coverage for Lib/asyncio/streams.py

#countcontent
1n/a"""Stream-related things."""
2n/a
3n/a__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
4n/a 'open_connection', 'start_server',
5n/a 'IncompleteReadError',
6n/a 'LimitOverrunError',
7n/a ]
8n/a
9n/aimport socket
10n/a
11n/aif hasattr(socket, 'AF_UNIX'):
12n/a __all__.extend(['open_unix_connection', 'start_unix_server'])
13n/a
14n/afrom . import coroutines
15n/afrom . import compat
16n/afrom . import events
17n/afrom . import protocols
18n/afrom .coroutines import coroutine
19n/afrom .log import logger
20n/a
21n/a
22n/a_DEFAULT_LIMIT = 2 ** 16
23n/a
24n/a
25n/aclass IncompleteReadError(EOFError):
26n/a """
27n/a Incomplete read error. Attributes:
28n/a
29n/a - partial: read bytes string before the end of stream was reached
30n/a - expected: total number of expected bytes (or None if unknown)
31n/a """
32n/a def __init__(self, partial, expected):
33n/a super().__init__("%d bytes read on a total of %r expected bytes"
34n/a % (len(partial), expected))
35n/a self.partial = partial
36n/a self.expected = expected
37n/a
38n/a
39n/aclass LimitOverrunError(Exception):
40n/a """Reached the buffer limit while looking for a separator.
41n/a
42n/a Attributes:
43n/a - consumed: total number of to be consumed bytes.
44n/a """
45n/a def __init__(self, message, consumed):
46n/a super().__init__(message)
47n/a self.consumed = consumed
48n/a
49n/a
50n/a@coroutine
51n/adef open_connection(host=None, port=None, *,
52n/a loop=None, limit=_DEFAULT_LIMIT, **kwds):
53n/a """A wrapper for create_connection() returning a (reader, writer) pair.
54n/a
55n/a The reader returned is a StreamReader instance; the writer is a
56n/a StreamWriter instance.
57n/a
58n/a The arguments are all the usual arguments to create_connection()
59n/a except protocol_factory; most common are positional host and port,
60n/a with various optional keyword arguments following.
61n/a
62n/a Additional optional keyword arguments are loop (to set the event loop
63n/a instance to use) and limit (to set the buffer limit passed to the
64n/a StreamReader).
65n/a
66n/a (If you want to customize the StreamReader and/or
67n/a StreamReaderProtocol classes, just copy the code -- there's
68n/a really nothing special here except some convenience.)
69n/a """
70n/a if loop is None:
71n/a loop = events.get_event_loop()
72n/a reader = StreamReader(limit=limit, loop=loop)
73n/a protocol = StreamReaderProtocol(reader, loop=loop)
74n/a transport, _ = yield from loop.create_connection(
75n/a lambda: protocol, host, port, **kwds)
76n/a writer = StreamWriter(transport, protocol, reader, loop)
77n/a return reader, writer
78n/a
79n/a
80n/a@coroutine
81n/adef start_server(client_connected_cb, host=None, port=None, *,
82n/a loop=None, limit=_DEFAULT_LIMIT, **kwds):
83n/a """Start a socket server, call back for each client connected.
84n/a
85n/a The first parameter, `client_connected_cb`, takes two parameters:
86n/a client_reader, client_writer. client_reader is a StreamReader
87n/a object, while client_writer is a StreamWriter object. This
88n/a parameter can either be a plain callback function or a coroutine;
89n/a if it is a coroutine, it will be automatically converted into a
90n/a Task.
91n/a
92n/a The rest of the arguments are all the usual arguments to
93n/a loop.create_server() except protocol_factory; most common are
94n/a positional host and port, with various optional keyword arguments
95n/a following. The return value is the same as loop.create_server().
96n/a
97n/a Additional optional keyword arguments are loop (to set the event loop
98n/a instance to use) and limit (to set the buffer limit passed to the
99n/a StreamReader).
100n/a
101n/a The return value is the same as loop.create_server(), i.e. a
102n/a Server object which can be used to stop the service.
103n/a """
104n/a if loop is None:
105n/a loop = events.get_event_loop()
106n/a
107n/a def factory():
108n/a reader = StreamReader(limit=limit, loop=loop)
109n/a protocol = StreamReaderProtocol(reader, client_connected_cb,
110n/a loop=loop)
111n/a return protocol
112n/a
113n/a return (yield from loop.create_server(factory, host, port, **kwds))
114n/a
115n/a
116n/aif hasattr(socket, 'AF_UNIX'):
117n/a # UNIX Domain Sockets are supported on this platform
118n/a
119n/a @coroutine
120n/a def open_unix_connection(path=None, *,
121n/a loop=None, limit=_DEFAULT_LIMIT, **kwds):
122n/a """Similar to `open_connection` but works with UNIX Domain Sockets."""
123n/a if loop is None:
124n/a loop = events.get_event_loop()
125n/a reader = StreamReader(limit=limit, loop=loop)
126n/a protocol = StreamReaderProtocol(reader, loop=loop)
127n/a transport, _ = yield from loop.create_unix_connection(
128n/a lambda: protocol, path, **kwds)
129n/a writer = StreamWriter(transport, protocol, reader, loop)
130n/a return reader, writer
131n/a
132n/a @coroutine
133n/a def start_unix_server(client_connected_cb, path=None, *,
134n/a loop=None, limit=_DEFAULT_LIMIT, **kwds):
135n/a """Similar to `start_server` but works with UNIX Domain Sockets."""
136n/a if loop is None:
137n/a loop = events.get_event_loop()
138n/a
139n/a def factory():
140n/a reader = StreamReader(limit=limit, loop=loop)
141n/a protocol = StreamReaderProtocol(reader, client_connected_cb,
142n/a loop=loop)
143n/a return protocol
144n/a
145n/a return (yield from loop.create_unix_server(factory, path, **kwds))
146n/a
147n/a
148n/aclass FlowControlMixin(protocols.Protocol):
149n/a """Reusable flow control logic for StreamWriter.drain().
150n/a
151n/a This implements the protocol methods pause_writing(),
152n/a resume_reading() and connection_lost(). If the subclass overrides
153n/a these it must call the super methods.
154n/a
155n/a StreamWriter.drain() must wait for _drain_helper() coroutine.
156n/a """
157n/a
158n/a def __init__(self, loop=None):
159n/a if loop is None:
160n/a self._loop = events.get_event_loop()
161n/a else:
162n/a self._loop = loop
163n/a self._paused = False
164n/a self._drain_waiter = None
165n/a self._connection_lost = False
166n/a
167n/a def pause_writing(self):
168n/a assert not self._paused
169n/a self._paused = True
170n/a if self._loop.get_debug():
171n/a logger.debug("%r pauses writing", self)
172n/a
173n/a def resume_writing(self):
174n/a assert self._paused
175n/a self._paused = False
176n/a if self._loop.get_debug():
177n/a logger.debug("%r resumes writing", self)
178n/a
179n/a waiter = self._drain_waiter
180n/a if waiter is not None:
181n/a self._drain_waiter = None
182n/a if not waiter.done():
183n/a waiter.set_result(None)
184n/a
185n/a def connection_lost(self, exc):
186n/a self._connection_lost = True
187n/a # Wake up the writer if currently paused.
188n/a if not self._paused:
189n/a return
190n/a waiter = self._drain_waiter
191n/a if waiter is None:
192n/a return
193n/a self._drain_waiter = None
194n/a if waiter.done():
195n/a return
196n/a if exc is None:
197n/a waiter.set_result(None)
198n/a else:
199n/a waiter.set_exception(exc)
200n/a
201n/a @coroutine
202n/a def _drain_helper(self):
203n/a if self._connection_lost:
204n/a raise ConnectionResetError('Connection lost')
205n/a if not self._paused:
206n/a return
207n/a waiter = self._drain_waiter
208n/a assert waiter is None or waiter.cancelled()
209n/a waiter = self._loop.create_future()
210n/a self._drain_waiter = waiter
211n/a yield from waiter
212n/a
213n/a
214n/aclass StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
215n/a """Helper class to adapt between Protocol and StreamReader.
216n/a
217n/a (This is a helper class instead of making StreamReader itself a
218n/a Protocol subclass, because the StreamReader has other potential
219n/a uses, and to prevent the user of the StreamReader to accidentally
220n/a call inappropriate methods of the protocol.)
221n/a """
222n/a
223n/a def __init__(self, stream_reader, client_connected_cb=None, loop=None):
224n/a super().__init__(loop=loop)
225n/a self._stream_reader = stream_reader
226n/a self._stream_writer = None
227n/a self._client_connected_cb = client_connected_cb
228n/a self._over_ssl = False
229n/a
230n/a def connection_made(self, transport):
231n/a self._stream_reader.set_transport(transport)
232n/a self._over_ssl = transport.get_extra_info('sslcontext') is not None
233n/a if self._client_connected_cb is not None:
234n/a self._stream_writer = StreamWriter(transport, self,
235n/a self._stream_reader,
236n/a self._loop)
237n/a res = self._client_connected_cb(self._stream_reader,
238n/a self._stream_writer)
239n/a if coroutines.iscoroutine(res):
240n/a self._loop.create_task(res)
241n/a
242n/a def connection_lost(self, exc):
243n/a if self._stream_reader is not None:
244n/a if exc is None:
245n/a self._stream_reader.feed_eof()
246n/a else:
247n/a self._stream_reader.set_exception(exc)
248n/a super().connection_lost(exc)
249n/a self._stream_reader = None
250n/a self._stream_writer = None
251n/a
252n/a def data_received(self, data):
253n/a self._stream_reader.feed_data(data)
254n/a
255n/a def eof_received(self):
256n/a self._stream_reader.feed_eof()
257n/a if self._over_ssl:
258n/a # Prevent a warning in SSLProtocol.eof_received:
259n/a # "returning true from eof_received()
260n/a # has no effect when using ssl"
261n/a return False
262n/a return True
263n/a
264n/a
265n/aclass StreamWriter:
266n/a """Wraps a Transport.
267n/a
268n/a This exposes write(), writelines(), [can_]write_eof(),
269n/a get_extra_info() and close(). It adds drain() which returns an
270n/a optional Future on which you can wait for flow control. It also
271n/a adds a transport property which references the Transport
272n/a directly.
273n/a """
274n/a
275n/a def __init__(self, transport, protocol, reader, loop):
276n/a self._transport = transport
277n/a self._protocol = protocol
278n/a # drain() expects that the reader has an exception() method
279n/a assert reader is None or isinstance(reader, StreamReader)
280n/a self._reader = reader
281n/a self._loop = loop
282n/a
283n/a def __repr__(self):
284n/a info = [self.__class__.__name__, 'transport=%r' % self._transport]
285n/a if self._reader is not None:
286n/a info.append('reader=%r' % self._reader)
287n/a return '<%s>' % ' '.join(info)
288n/a
289n/a @property
290n/a def transport(self):
291n/a return self._transport
292n/a
293n/a def write(self, data):
294n/a self._transport.write(data)
295n/a
296n/a def writelines(self, data):
297n/a self._transport.writelines(data)
298n/a
299n/a def write_eof(self):
300n/a return self._transport.write_eof()
301n/a
302n/a def can_write_eof(self):
303n/a return self._transport.can_write_eof()
304n/a
305n/a def close(self):
306n/a return self._transport.close()
307n/a
308n/a def get_extra_info(self, name, default=None):
309n/a return self._transport.get_extra_info(name, default)
310n/a
311n/a @coroutine
312n/a def drain(self):
313n/a """Flush the write buffer.
314n/a
315n/a The intended use is to write
316n/a
317n/a w.write(data)
318n/a yield from w.drain()
319n/a """
320n/a if self._reader is not None:
321n/a exc = self._reader.exception()
322n/a if exc is not None:
323n/a raise exc
324n/a if self._transport is not None:
325n/a if self._transport.is_closing():
326n/a # Yield to the event loop so connection_lost() may be
327n/a # called. Without this, _drain_helper() would return
328n/a # immediately, and code that calls
329n/a # write(...); yield from drain()
330n/a # in a loop would never call connection_lost(), so it
331n/a # would not see an error when the socket is closed.
332n/a yield
333n/a yield from self._protocol._drain_helper()
334n/a
335n/a
336n/aclass StreamReader:
337n/a
338n/a def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
339n/a # The line length limit is a security feature;
340n/a # it also doubles as half the buffer limit.
341n/a
342n/a if limit <= 0:
343n/a raise ValueError('Limit cannot be <= 0')
344n/a
345n/a self._limit = limit
346n/a if loop is None:
347n/a self._loop = events.get_event_loop()
348n/a else:
349n/a self._loop = loop
350n/a self._buffer = bytearray()
351n/a self._eof = False # Whether we're done.
352n/a self._waiter = None # A future used by _wait_for_data()
353n/a self._exception = None
354n/a self._transport = None
355n/a self._paused = False
356n/a
357n/a def __repr__(self):
358n/a info = ['StreamReader']
359n/a if self._buffer:
360n/a info.append('%d bytes' % len(self._buffer))
361n/a if self._eof:
362n/a info.append('eof')
363n/a if self._limit != _DEFAULT_LIMIT:
364n/a info.append('l=%d' % self._limit)
365n/a if self._waiter:
366n/a info.append('w=%r' % self._waiter)
367n/a if self._exception:
368n/a info.append('e=%r' % self._exception)
369n/a if self._transport:
370n/a info.append('t=%r' % self._transport)
371n/a if self._paused:
372n/a info.append('paused')
373n/a return '<%s>' % ' '.join(info)
374n/a
375n/a def exception(self):
376n/a return self._exception
377n/a
378n/a def set_exception(self, exc):
379n/a self._exception = exc
380n/a
381n/a waiter = self._waiter
382n/a if waiter is not None:
383n/a self._waiter = None
384n/a if not waiter.cancelled():
385n/a waiter.set_exception(exc)
386n/a
387n/a def _wakeup_waiter(self):
388n/a """Wakeup read*() functions waiting for data or EOF."""
389n/a waiter = self._waiter
390n/a if waiter is not None:
391n/a self._waiter = None
392n/a if not waiter.cancelled():
393n/a waiter.set_result(None)
394n/a
395n/a def set_transport(self, transport):
396n/a assert self._transport is None, 'Transport already set'
397n/a self._transport = transport
398n/a
399n/a def _maybe_resume_transport(self):
400n/a if self._paused and len(self._buffer) <= self._limit:
401n/a self._paused = False
402n/a self._transport.resume_reading()
403n/a
404n/a def feed_eof(self):
405n/a self._eof = True
406n/a self._wakeup_waiter()
407n/a
408n/a def at_eof(self):
409n/a """Return True if the buffer is empty and 'feed_eof' was called."""
410n/a return self._eof and not self._buffer
411n/a
412n/a def feed_data(self, data):
413n/a assert not self._eof, 'feed_data after feed_eof'
414n/a
415n/a if not data:
416n/a return
417n/a
418n/a self._buffer.extend(data)
419n/a self._wakeup_waiter()
420n/a
421n/a if (self._transport is not None and
422n/a not self._paused and
423n/a len(self._buffer) > 2 * self._limit):
424n/a try:
425n/a self._transport.pause_reading()
426n/a except NotImplementedError:
427n/a # The transport can't be paused.
428n/a # We'll just have to buffer all data.
429n/a # Forget the transport so we don't keep trying.
430n/a self._transport = None
431n/a else:
432n/a self._paused = True
433n/a
434n/a @coroutine
435n/a def _wait_for_data(self, func_name):
436n/a """Wait until feed_data() or feed_eof() is called.
437n/a
438n/a If stream was paused, automatically resume it.
439n/a """
440n/a # StreamReader uses a future to link the protocol feed_data() method
441n/a # to a read coroutine. Running two read coroutines at the same time
442n/a # would have an unexpected behaviour. It would not possible to know
443n/a # which coroutine would get the next data.
444n/a if self._waiter is not None:
445n/a raise RuntimeError('%s() called while another coroutine is '
446n/a 'already waiting for incoming data' % func_name)
447n/a
448n/a assert not self._eof, '_wait_for_data after EOF'
449n/a
450n/a # Waiting for data while paused will make deadlock, so prevent it.
451n/a # This is essential for readexactly(n) for case when n > self._limit.
452n/a if self._paused:
453n/a self._paused = False
454n/a self._transport.resume_reading()
455n/a
456n/a self._waiter = self._loop.create_future()
457n/a try:
458n/a yield from self._waiter
459n/a finally:
460n/a self._waiter = None
461n/a
462n/a @coroutine
463n/a def readline(self):
464n/a """Read chunk of data from the stream until newline (b'\n') is found.
465n/a
466n/a On success, return chunk that ends with newline. If only partial
467n/a line can be read due to EOF, return incomplete line without
468n/a terminating newline. When EOF was reached while no bytes read, empty
469n/a bytes object is returned.
470n/a
471n/a If limit is reached, ValueError will be raised. In that case, if
472n/a newline was found, complete line including newline will be removed
473n/a from internal buffer. Else, internal buffer will be cleared. Limit is
474n/a compared against part of the line without newline.
475n/a
476n/a If stream was paused, this function will automatically resume it if
477n/a needed.
478n/a """
479n/a sep = b'\n'
480n/a seplen = len(sep)
481n/a try:
482n/a line = yield from self.readuntil(sep)
483n/a except IncompleteReadError as e:
484n/a return e.partial
485n/a except LimitOverrunError as e:
486n/a if self._buffer.startswith(sep, e.consumed):
487n/a del self._buffer[:e.consumed + seplen]
488n/a else:
489n/a self._buffer.clear()
490n/a self._maybe_resume_transport()
491n/a raise ValueError(e.args[0])
492n/a return line
493n/a
494n/a @coroutine
495n/a def readuntil(self, separator=b'\n'):
496n/a """Read data from the stream until ``separator`` is found.
497n/a
498n/a On success, the data and separator will be removed from the
499n/a internal buffer (consumed). Returned data will include the
500n/a separator at the end.
501n/a
502n/a Configured stream limit is used to check result. Limit sets the
503n/a maximal length of data that can be returned, not counting the
504n/a separator.
505n/a
506n/a If an EOF occurs and the complete separator is still not found,
507n/a an IncompleteReadError exception will be raised, and the internal
508n/a buffer will be reset. The IncompleteReadError.partial attribute
509n/a may contain the separator partially.
510n/a
511n/a If the data cannot be read because of over limit, a
512n/a LimitOverrunError exception will be raised, and the data
513n/a will be left in the internal buffer, so it can be read again.
514n/a """
515n/a seplen = len(separator)
516n/a if seplen == 0:
517n/a raise ValueError('Separator should be at least one-byte string')
518n/a
519n/a if self._exception is not None:
520n/a raise self._exception
521n/a
522n/a # Consume whole buffer except last bytes, which length is
523n/a # one less than seplen. Let's check corner cases with
524n/a # separator='SEPARATOR':
525n/a # * we have received almost complete separator (without last
526n/a # byte). i.e buffer='some textSEPARATO'. In this case we
527n/a # can safely consume len(separator) - 1 bytes.
528n/a # * last byte of buffer is first byte of separator, i.e.
529n/a # buffer='abcdefghijklmnopqrS'. We may safely consume
530n/a # everything except that last byte, but this require to
531n/a # analyze bytes of buffer that match partial separator.
532n/a # This is slow and/or require FSM. For this case our
533n/a # implementation is not optimal, since require rescanning
534n/a # of data that is known to not belong to separator. In
535n/a # real world, separator will not be so long to notice
536n/a # performance problems. Even when reading MIME-encoded
537n/a # messages :)
538n/a
539n/a # `offset` is the number of bytes from the beginning of the buffer
540n/a # where there is no occurrence of `separator`.
541n/a offset = 0
542n/a
543n/a # Loop until we find `separator` in the buffer, exceed the buffer size,
544n/a # or an EOF has happened.
545n/a while True:
546n/a buflen = len(self._buffer)
547n/a
548n/a # Check if we now have enough data in the buffer for `separator` to
549n/a # fit.
550n/a if buflen - offset >= seplen:
551n/a isep = self._buffer.find(separator, offset)
552n/a
553n/a if isep != -1:
554n/a # `separator` is in the buffer. `isep` will be used later
555n/a # to retrieve the data.
556n/a break
557n/a
558n/a # see upper comment for explanation.
559n/a offset = buflen + 1 - seplen
560n/a if offset > self._limit:
561n/a raise LimitOverrunError(
562n/a 'Separator is not found, and chunk exceed the limit',
563n/a offset)
564n/a
565n/a # Complete message (with full separator) may be present in buffer
566n/a # even when EOF flag is set. This may happen when the last chunk
567n/a # adds data which makes separator be found. That's why we check for
568n/a # EOF *ater* inspecting the buffer.
569n/a if self._eof:
570n/a chunk = bytes(self._buffer)
571n/a self._buffer.clear()
572n/a raise IncompleteReadError(chunk, None)
573n/a
574n/a # _wait_for_data() will resume reading if stream was paused.
575n/a yield from self._wait_for_data('readuntil')
576n/a
577n/a if isep > self._limit:
578n/a raise LimitOverrunError(
579n/a 'Separator is found, but chunk is longer than limit', isep)
580n/a
581n/a chunk = self._buffer[:isep + seplen]
582n/a del self._buffer[:isep + seplen]
583n/a self._maybe_resume_transport()
584n/a return bytes(chunk)
585n/a
586n/a @coroutine
587n/a def read(self, n=-1):
588n/a """Read up to `n` bytes from the stream.
589n/a
590n/a If n is not provided, or set to -1, read until EOF and return all read
591n/a bytes. If the EOF was received and the internal buffer is empty, return
592n/a an empty bytes object.
593n/a
594n/a If n is zero, return empty bytes object immediately.
595n/a
596n/a If n is positive, this function try to read `n` bytes, and may return
597n/a less or equal bytes than requested, but at least one byte. If EOF was
598n/a received before any byte is read, this function returns empty byte
599n/a object.
600n/a
601n/a Returned value is not limited with limit, configured at stream
602n/a creation.
603n/a
604n/a If stream was paused, this function will automatically resume it if
605n/a needed.
606n/a """
607n/a
608n/a if self._exception is not None:
609n/a raise self._exception
610n/a
611n/a if n == 0:
612n/a return b''
613n/a
614n/a if n < 0:
615n/a # This used to just loop creating a new waiter hoping to
616n/a # collect everything in self._buffer, but that would
617n/a # deadlock if the subprocess sends more than self.limit
618n/a # bytes. So just call self.read(self._limit) until EOF.
619n/a blocks = []
620n/a while True:
621n/a block = yield from self.read(self._limit)
622n/a if not block:
623n/a break
624n/a blocks.append(block)
625n/a return b''.join(blocks)
626n/a
627n/a if not self._buffer and not self._eof:
628n/a yield from self._wait_for_data('read')
629n/a
630n/a # This will work right even if buffer is less than n bytes
631n/a data = bytes(self._buffer[:n])
632n/a del self._buffer[:n]
633n/a
634n/a self._maybe_resume_transport()
635n/a return data
636n/a
637n/a @coroutine
638n/a def readexactly(self, n):
639n/a """Read exactly `n` bytes.
640n/a
641n/a Raise an IncompleteReadError if EOF is reached before `n` bytes can be
642n/a read. The IncompleteReadError.partial attribute of the exception will
643n/a contain the partial read bytes.
644n/a
645n/a if n is zero, return empty bytes object.
646n/a
647n/a Returned value is not limited with limit, configured at stream
648n/a creation.
649n/a
650n/a If stream was paused, this function will automatically resume it if
651n/a needed.
652n/a """
653n/a if n < 0:
654n/a raise ValueError('readexactly size can not be less than zero')
655n/a
656n/a if self._exception is not None:
657n/a raise self._exception
658n/a
659n/a if n == 0:
660n/a return b''
661n/a
662n/a while len(self._buffer) < n:
663n/a if self._eof:
664n/a incomplete = bytes(self._buffer)
665n/a self._buffer.clear()
666n/a raise IncompleteReadError(incomplete, n)
667n/a
668n/a yield from self._wait_for_data('readexactly')
669n/a
670n/a if len(self._buffer) == n:
671n/a data = bytes(self._buffer)
672n/a self._buffer.clear()
673n/a else:
674n/a data = bytes(self._buffer[:n])
675n/a del self._buffer[:n]
676n/a self._maybe_resume_transport()
677n/a return data
678n/a
679n/a if compat.PY35:
680n/a @coroutine
681n/a def __aiter__(self):
682n/a return self
683n/a
684n/a @coroutine
685n/a def __anext__(self):
686n/a val = yield from self.readline()
687n/a if val == b'':
688n/a raise StopAsyncIteration
689n/a return val
690n/a
691n/a if compat.PY352:
692n/a # In Python 3.5.2 and greater, __aiter__ should return
693n/a # the asynchronous iterator directly.
694n/a def __aiter__(self):
695n/a return self