»Core Development>Code coverage>Lib/test/test_nntplib.py

Python code coverage for Lib/test/test_nntplib.py

1n/aimport io
2n/aimport socket
3n/aimport datetime
4n/aimport textwrap
5n/aimport unittest
6n/aimport functools
7n/aimport contextlib
8n/aimport os.path
9n/afrom test import support
10n/afrom nntplib import NNTP, GroupInfo
11n/aimport nntplib
12n/afrom unittest.mock import patch
14n/a import ssl
15n/aexcept ImportError:
16n/a ssl = None
18n/a import threading
19n/aexcept ImportError:
20n/a threading = None
22n/aTIMEOUT = 30
23n/acertfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem')
25n/a# TODO:
26n/a# - test the `file` arg to more commands
27n/a# - test error conditions
28n/a# - test auth and `usenetrc`
31n/aclass NetworkedNNTPTestsMixin:
33n/a def test_welcome(self):
34n/a welcome = self.server.getwelcome()
35n/a self.assertEqual(str, type(welcome))
37n/a def test_help(self):
38n/a resp, lines = self.server.help()
39n/a self.assertTrue(resp.startswith("100 "), resp)
40n/a for line in lines:
41n/a self.assertEqual(str, type(line))
43n/a def test_list(self):
44n/a resp, groups = self.server.list()
45n/a if len(groups) > 0:
46n/a self.assertEqual(GroupInfo, type(groups[0]))
47n/a self.assertEqual(str, type(groups[0].group))
49n/a def test_list_active(self):
50n/a resp, groups = self.server.list(self.GROUP_PAT)
51n/a if len(groups) > 0:
52n/a self.assertEqual(GroupInfo, type(groups[0]))
53n/a self.assertEqual(str, type(groups[0].group))
55n/a def test_unknown_command(self):
56n/a with self.assertRaises(nntplib.NNTPPermanentError) as cm:
57n/a self.server._shortcmd("XYZZY")
58n/a resp = cm.exception.response
59n/a self.assertTrue(resp.startswith("500 "), resp)
61n/a def test_newgroups(self):
62n/a # gmane gets a constant influx of new groups. In order not to stress
63n/a # the server too much, we choose a recent date in the past.
64n/a dt = datetime.date.today() - datetime.timedelta(days=7)
65n/a resp, groups = self.server.newgroups(dt)
66n/a if len(groups) > 0:
67n/a self.assertIsInstance(groups[0], GroupInfo)
68n/a self.assertIsInstance(groups[0].group, str)
70n/a def test_description(self):
71n/a def _check_desc(desc):
72n/a # Sanity checks
73n/a self.assertIsInstance(desc, str)
74n/a self.assertNotIn(self.GROUP_NAME, desc)
75n/a desc = self.server.description(self.GROUP_NAME)
76n/a _check_desc(desc)
77n/a # Another sanity check
78n/a self.assertIn("Python", desc)
79n/a # With a pattern
80n/a desc = self.server.description(self.GROUP_PAT)
81n/a _check_desc(desc)
82n/a # Shouldn't exist
83n/a desc = self.server.description("zk.brrtt.baz")
84n/a self.assertEqual(desc, '')
86n/a def test_descriptions(self):
87n/a resp, descs = self.server.descriptions(self.GROUP_PAT)
88n/a # 215 for LIST NEWSGROUPS, 282 for XGTITLE
89n/a self.assertTrue(
90n/a resp.startswith("215 ") or resp.startswith("282 "), resp)
91n/a self.assertIsInstance(descs, dict)
92n/a desc = descs[self.GROUP_NAME]
93n/a self.assertEqual(desc, self.server.description(self.GROUP_NAME))
95n/a def test_group(self):
96n/a result = self.server.group(self.GROUP_NAME)
97n/a self.assertEqual(5, len(result))
98n/a resp, count, first, last, group = result
99n/a self.assertEqual(group, self.GROUP_NAME)
100n/a self.assertIsInstance(count, int)
101n/a self.assertIsInstance(first, int)
102n/a self.assertIsInstance(last, int)
103n/a self.assertLessEqual(first, last)
104n/a self.assertTrue(resp.startswith("211 "), resp)
106n/a def test_date(self):
107n/a resp, date = self.server.date()
108n/a self.assertIsInstance(date, datetime.datetime)
109n/a # Sanity check
110n/a self.assertGreaterEqual(date.year, 1995)
111n/a self.assertLessEqual(date.year, 2030)
113n/a def _check_art_dict(self, art_dict):
114n/a # Some sanity checks for a field dictionary returned by OVER / XOVER
115n/a self.assertIsInstance(art_dict, dict)
116n/a # NNTP has 7 mandatory fields
117n/a self.assertGreaterEqual(art_dict.keys(),
118n/a {"subject", "from", "date", "message-id",
119n/a "references", ":bytes", ":lines"}
120n/a )
121n/a for v in art_dict.values():
122n/a self.assertIsInstance(v, (str, type(None)))
124n/a def test_xover(self):
125n/a resp, count, first, last, name = self.server.group(self.GROUP_NAME)
126n/a resp, lines = self.server.xover(last - 5, last)
127n/a if len(lines) == 0:
128n/a self.skipTest("no articles retrieved")
129n/a # The 'last' article is not necessarily part of the output (cancelled?)
130n/a art_num, art_dict = lines[0]
131n/a self.assertGreaterEqual(art_num, last - 5)
132n/a self.assertLessEqual(art_num, last)
133n/a self._check_art_dict(art_dict)
135n/a @unittest.skipIf(True, 'temporarily skipped until a permanent solution'
136n/a ' is found for issue #28971')
137n/a def test_over(self):
138n/a resp, count, first, last, name = self.server.group(self.GROUP_NAME)
139n/a start = last - 10
140n/a # The "start-" article range form
141n/a resp, lines = self.server.over((start, None))
142n/a art_num, art_dict = lines[0]
143n/a self._check_art_dict(art_dict)
144n/a # The "start-end" article range form
145n/a resp, lines = self.server.over((start, last))
146n/a art_num, art_dict = lines[-1]
147n/a # The 'last' article is not necessarily part of the output (cancelled?)
148n/a self.assertGreaterEqual(art_num, start)
149n/a self.assertLessEqual(art_num, last)
150n/a self._check_art_dict(art_dict)
151n/a # XXX The "message_id" form is unsupported by gmane
152n/a # 503 Overview by message-ID unsupported
154n/a def test_xhdr(self):
155n/a resp, count, first, last, name = self.server.group(self.GROUP_NAME)
156n/a resp, lines = self.server.xhdr('subject', last)
157n/a for line in lines:
158n/a self.assertEqual(str, type(line[1]))
160n/a def check_article_resp(self, resp, article, art_num=None):
161n/a self.assertIsInstance(article, nntplib.ArticleInfo)
162n/a if art_num is not None:
163n/a self.assertEqual(article.number, art_num)
164n/a for line in article.lines:
165n/a self.assertIsInstance(line, bytes)
166n/a # XXX this could exceptionally happen...
167n/a self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
169n/a def test_article_head_body(self):
170n/a resp, count, first, last, name = self.server.group(self.GROUP_NAME)
171n/a # Try to find an available article
172n/a for art_num in (last, first, last - 1):
173n/a try:
174n/a resp, head = self.server.head(art_num)
175n/a except nntplib.NNTPTemporaryError as e:
176n/a if not e.response.startswith("423 "):
177n/a raise
178n/a # "423 No such article" => choose another one
179n/a continue
180n/a break
181n/a else:
182n/a self.skipTest("could not find a suitable article number")
183n/a self.assertTrue(resp.startswith("221 "), resp)
184n/a self.check_article_resp(resp, head, art_num)
185n/a resp, body = self.server.body(art_num)
186n/a self.assertTrue(resp.startswith("222 "), resp)
187n/a self.check_article_resp(resp, body, art_num)
188n/a resp, article = self.server.article(art_num)
189n/a self.assertTrue(resp.startswith("220 "), resp)
190n/a self.check_article_resp(resp, article, art_num)
191n/a # Tolerate running the tests from behind a NNTP virus checker
192n/a blacklist = lambda line: line.startswith(b'X-Antivirus')
193n/a filtered_head_lines = [line for line in head.lines
194n/a if not blacklist(line)]
195n/a filtered_lines = [line for line in article.lines
196n/a if not blacklist(line)]
197n/a self.assertEqual(filtered_lines, filtered_head_lines + [b''] + body.lines)
199n/a def test_capabilities(self):
200n/a # The server under test implements NNTP version 2 and has a
201n/a # couple of well-known capabilities. Just sanity check that we
202n/a # got them.
203n/a def _check_caps(caps):
204n/a caps_list = caps['LIST']
205n/a self.assertIsInstance(caps_list, (list, tuple))
206n/a self.assertIn('OVERVIEW.FMT', caps_list)
207n/a self.assertGreaterEqual(self.server.nntp_version, 2)
208n/a _check_caps(self.server.getcapabilities())
209n/a # This re-emits the command
210n/a resp, caps = self.server.capabilities()
211n/a _check_caps(caps)
213n/a def test_zlogin(self):
214n/a # This test must be the penultimate because further commands will be
215n/a # refused.
216n/a baduser = "notarealuser"
217n/a badpw = "notarealpassword"
218n/a # Check that bogus credentials cause failure
219n/a self.assertRaises(nntplib.NNTPError, self.server.login,
220n/a user=baduser, password=badpw, usenetrc=False)
221n/a # FIXME: We should check that correct credentials succeed, but that
222n/a # would require valid details for some server somewhere to be in the
223n/a # test suite, I think. Gmane is anonymous, at least as used for the
224n/a # other tests.
226n/a def test_zzquit(self):
227n/a # This test must be called last, hence the name
228n/a cls = type(self)
229n/a try:
230n/a self.server.quit()
231n/a finally:
232n/a cls.server = None
234n/a @classmethod
235n/a def wrap_methods(cls):
236n/a # Wrap all methods in a transient_internet() exception catcher
237n/a # XXX put a generic version in test.support?
238n/a def wrap_meth(meth):
239n/a @functools.wraps(meth)
240n/a def wrapped(self):
241n/a with support.transient_internet(self.NNTP_HOST):
242n/a meth(self)
243n/a return wrapped
244n/a for name in dir(cls):
245n/a if not name.startswith('test_'):
246n/a continue
247n/a meth = getattr(cls, name)
248n/a if not callable(meth):
249n/a continue
250n/a # Need to use a closure so that meth remains bound to its current
251n/a # value
252n/a setattr(cls, name, wrap_meth(meth))
254n/a def test_with_statement(self):
255n/a def is_connected():
256n/a if not hasattr(server, 'file'):
257n/a return False
258n/a try:
259n/a server.help()
260n/a except (OSError, EOFError):
261n/a return False
262n/a return True
264n/a with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
265n/a self.assertTrue(is_connected())
266n/a self.assertTrue(server.help())
267n/a self.assertFalse(is_connected())
269n/a with self.NNTP_CLASS(self.NNTP_HOST, timeout=TIMEOUT, usenetrc=False) as server:
270n/a server.quit()
271n/a self.assertFalse(is_connected())
277n/aclass NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
278n/a # This server supports STARTTLS (gmane doesn't)
279n/a NNTP_HOST = 'news.trigofacile.com'
280n/a GROUP_NAME = 'fr.comp.lang.python'
281n/a GROUP_PAT = 'fr.comp.lang.*'
285n/a @classmethod
286n/a def setUpClass(cls):
287n/a support.requires("network")
288n/a with support.transient_internet(cls.NNTP_HOST):
289n/a cls.server = cls.NNTP_CLASS(cls.NNTP_HOST, timeout=TIMEOUT, usenetrc=False)
291n/a @classmethod
292n/a def tearDownClass(cls):
293n/a if cls.server is not None:
294n/a cls.server.quit()
296n/a@unittest.skipUnless(ssl, 'requires SSL support')
297n/aclass NetworkedNNTP_SSLTests(NetworkedNNTPTests):
299n/a # Technical limits for this public NNTP server (see http://www.aioe.org):
300n/a # "Only two concurrent connections per IP address are allowed and
301n/a # 400 connections per day are accepted from each IP address."
303n/a NNTP_HOST = 'nntp.aioe.org'
304n/a GROUP_NAME = 'comp.lang.python'
305n/a GROUP_PAT = 'comp.lang.*'
307n/a NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
309n/a # Disabled as it produces too much data
310n/a test_list = None
312n/a # Disabled as the connection will already be encrypted.
313n/a test_starttls = None
317n/a# Non-networked tests using a local server (or something mocking it).
320n/aclass _NNTPServerIO(io.RawIOBase):
321n/a """A raw IO object allowing NNTP commands to be received and processed
322n/a by a handler. The handler can push responses which can then be read
323n/a from the IO object."""
325n/a def __init__(self, handler):
326n/a io.RawIOBase.__init__(self)
327n/a # The channel from the client
328n/a self.c2s = io.BytesIO()
329n/a # The channel to the client
330n/a self.s2c = io.BytesIO()
331n/a self.handler = handler
332n/a self.handler.start(self.c2s.readline, self.push_data)
334n/a def readable(self):
335n/a return True
337n/a def writable(self):
338n/a return True
340n/a def push_data(self, data):
341n/a """Push (buffer) some data to send to the client."""
342n/a pos = self.s2c.tell()
343n/a self.s2c.seek(0, 2)
344n/a self.s2c.write(data)
345n/a self.s2c.seek(pos)
347n/a def write(self, b):
348n/a """The client sends us some data"""
349n/a pos = self.c2s.tell()
350n/a self.c2s.write(b)
351n/a self.c2s.seek(pos)
352n/a self.handler.process_pending()
353n/a return len(b)
355n/a def readinto(self, buf):
356n/a """The client wants to read a response"""
357n/a self.handler.process_pending()
358n/a b = self.s2c.read(len(buf))
359n/a n = len(b)
360n/a buf[:n] = b
361n/a return n
364n/adef make_mock_file(handler):
365n/a sio = _NNTPServerIO(handler)
366n/a # Using BufferedRWPair instead of BufferedRandom ensures the file
367n/a # isn't seekable.
368n/a file = io.BufferedRWPair(sio, sio)
369n/a return (sio, file)
372n/aclass MockedNNTPTestsMixin:
373n/a # Override in derived classes
374n/a handler_class = None
376n/a def setUp(self):
377n/a super().setUp()
378n/a self.make_server()
380n/a def tearDown(self):
381n/a super().tearDown()
382n/a del self.server
384n/a def make_server(self, *args, **kwargs):
385n/a self.handler = self.handler_class()
386n/a self.sio, file = make_mock_file(self.handler)
387n/a self.server = nntplib._NNTPBase(file, 'test.server', *args, **kwargs)
388n/a return self.server
391n/aclass MockedNNTPWithReaderModeMixin(MockedNNTPTestsMixin):
392n/a def setUp(self):
393n/a super().setUp()
394n/a self.make_server(readermode=True)
397n/aclass NNTPv1Handler:
398n/a """A handler for RFC 977"""
400n/a welcome = "200 NNTP mock server"
402n/a def start(self, readline, push_data):
403n/a self.in_body = False
404n/a self.allow_posting = True
405n/a self._readline = readline
406n/a self._push_data = push_data
407n/a self._logged_in = False
408n/a self._user_sent = False
409n/a # Our welcome
410n/a self.handle_welcome()
412n/a def _decode(self, data):
413n/a return str(data, "utf-8", "surrogateescape")
415n/a def process_pending(self):
416n/a if self.in_body:
417n/a while True:
418n/a line = self._readline()
419n/a if not line:
420n/a return
421n/a self.body.append(line)
422n/a if line == b".\r\n":
423n/a break
424n/a try:
425n/a meth, tokens = self.body_callback
426n/a meth(*tokens, body=self.body)
427n/a finally:
428n/a self.body_callback = None
429n/a self.body = None
430n/a self.in_body = False
431n/a while True:
432n/a line = self._decode(self._readline())
433n/a if not line:
434n/a return
435n/a if not line.endswith("\r\n"):
436n/a raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
437n/a line = line[:-2]
438n/a cmd, *tokens = line.split()
439n/a #meth = getattr(self.handler, "handle_" + cmd.upper(), None)
440n/a meth = getattr(self, "handle_" + cmd.upper(), None)
441n/a if meth is None:
442n/a self.handle_unknown()
443n/a else:
444n/a try:
445n/a meth(*tokens)
446n/a except Exception as e:
447n/a raise ValueError("command failed: {!r}".format(line)) from e
448n/a else:
449n/a if self.in_body:
450n/a self.body_callback = meth, tokens
451n/a self.body = []
453n/a def expect_body(self):
454n/a """Flag that the client is expected to post a request body"""
455n/a self.in_body = True
457n/a def push_data(self, data):
458n/a """Push some binary data"""
459n/a self._push_data(data)
461n/a def push_lit(self, lit):
462n/a """Push a string literal"""
463n/a lit = textwrap.dedent(lit)
464n/a lit = "\r\n".join(lit.splitlines()) + "\r\n"
465n/a lit = lit.encode('utf-8')
466n/a self.push_data(lit)
468n/a def handle_unknown(self):
469n/a self.push_lit("500 What?")
471n/a def handle_welcome(self):
472n/a self.push_lit(self.welcome)
474n/a def handle_QUIT(self):
475n/a self.push_lit("205 Bye!")
477n/a def handle_DATE(self):
478n/a self.push_lit("111 20100914001155")
480n/a def handle_GROUP(self, group):
481n/a if group == "fr.comp.lang.python":
482n/a self.push_lit("211 486 761 1265 fr.comp.lang.python")
483n/a else:
484n/a self.push_lit("411 No such group {}".format(group))
486n/a def handle_HELP(self):
487n/a self.push_lit("""\
488n/a 100 Legal commands
489n/a authinfo user Name|pass Password|generic <prog> <args>
490n/a date
491n/a help
492n/a Report problems to <root@example.org>
493n/a .""")
495n/a def handle_STAT(self, message_spec=None):
496n/a if message_spec is None:
497n/a self.push_lit("412 No newsgroup selected")
498n/a elif message_spec == "3000234":
499n/a self.push_lit("223 3000234 <45223423@example.com>")
500n/a elif message_spec == "<45223423@example.com>":
501n/a self.push_lit("223 0 <45223423@example.com>")
502n/a else:
503n/a self.push_lit("430 No Such Article Found")
505n/a def handle_NEXT(self):
506n/a self.push_lit("223 3000237 <668929@example.org> retrieved")
508n/a def handle_LAST(self):
509n/a self.push_lit("223 3000234 <45223423@example.com> retrieved")
511n/a def handle_LIST(self, action=None, param=None):
512n/a if action is None:
513n/a self.push_lit("""\
514n/a 215 Newsgroups in form "group high low flags".
515n/a comp.lang.python 0000052340 0000002828 y
516n/a comp.lang.python.announce 0000001153 0000000993 m
517n/a free.it.comp.lang.python 0000000002 0000000002 y
518n/a fr.comp.lang.python 0000001254 0000000760 y
519n/a free.it.comp.lang.python.learner 0000000000 0000000001 y
520n/a tw.bbs.comp.lang.python 0000000304 0000000304 y
521n/a .""")
522n/a elif action == "ACTIVE":
523n/a if param == "*distutils*":
524n/a self.push_lit("""\
525n/a 215 Newsgroups in form "group high low flags"
526n/a gmane.comp.python.distutils.devel 0000014104 0000000001 m
527n/a gmane.comp.python.distutils.cvs 0000000000 0000000001 m
528n/a .""")
529n/a else:
530n/a self.push_lit("""\
531n/a 215 Newsgroups in form "group high low flags"
532n/a .""")
533n/a elif action == "OVERVIEW.FMT":
534n/a self.push_lit("""\
535n/a 215 Order of fields in overview database.
536n/a Subject:
537n/a From:
538n/a Date:
539n/a Message-ID:
540n/a References:
541n/a Bytes:
542n/a Lines:
543n/a Xref:full
544n/a .""")
545n/a elif action == "NEWSGROUPS":
546n/a assert param is not None
547n/a if param == "comp.lang.python":
548n/a self.push_lit("""\
549n/a 215 Descriptions in form "group description".
550n/a comp.lang.python\tThe Python computer language.
551n/a .""")
552n/a elif param == "comp.lang.python*":
553n/a self.push_lit("""\
554n/a 215 Descriptions in form "group description".
555n/a comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
556n/a comp.lang.python\tThe Python computer language.
557n/a .""")
558n/a else:
559n/a self.push_lit("""\
560n/a 215 Descriptions in form "group description".
561n/a .""")
562n/a else:
563n/a self.push_lit('501 Unknown LIST keyword')
565n/a def handle_NEWNEWS(self, group, date_str, time_str):
566n/a # We hard code different return messages depending on passed
567n/a # argument and date syntax.
568n/a if (group == "comp.lang.python" and date_str == "20100913"
569n/a and time_str == "082004"):
570n/a # Date was passed in RFC 3977 format (NNTP "v2")
571n/a self.push_lit("""\
572n/a 230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
573n/a <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
574n/a <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
575n/a .""")
576n/a elif (group == "comp.lang.python" and date_str == "100913"
577n/a and time_str == "082004"):
578n/a # Date was passed in RFC 977 format (NNTP "v1")
579n/a self.push_lit("""\
580n/a 230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
581n/a <a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
582n/a <f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
583n/a .""")
584n/a elif (group == 'comp.lang.python' and
585n/a date_str in ('20100101', '100101') and
586n/a time_str == '090000'):
587n/a self.push_lit('too long line' * 3000 +
588n/a '\n.')
589n/a else:
590n/a self.push_lit("""\
591n/a 230 An empty list of newsarticles follows
592n/a .""")
593n/a # (Note for experiments: many servers disable NEWNEWS.
594n/a # As of this writing, sicinfo3.epfl.ch doesn't.)
596n/a def handle_XOVER(self, message_spec):
597n/a if message_spec == "57-59":
598n/a self.push_lit(
599n/a "224 Overview information for 57-58 follows\n"
600n/a "57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
601n/a "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
602n/a "\tSat, 19 Jun 2010 18:04:08 -0400"
603n/a "\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
604n/a "\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
605n/a "\tXref: news.gmane.org gmane.comp.python.authors:57"
606n/a "\n"
607n/a "58\tLooking for a few good bloggers"
608n/a "\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
609n/a "\tThu, 22 Jul 2010 09:14:14 -0400"
610n/a "\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
611n/a "\t\t6683\t16"
612n/a "\t"
613n/a "\n"
614n/a # A UTF-8 overview line from fr.comp.lang.python
615n/a "59\tRe: Message d'erreur incompréhensible (par moi)"
616n/a "\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
617n/a "\tWed, 15 Sep 2010 18:09:15 +0200"
618n/a "\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
619n/a "\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
620n/a "\tXref: saria.nerim.net fr.comp.lang.python:1265"
621n/a "\n"
622n/a ".\n")
623n/a else:
624n/a self.push_lit("""\
625n/a 224 No articles
626n/a .""")
628n/a def handle_POST(self, *, body=None):
629n/a if body is None:
630n/a if self.allow_posting:
631n/a self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
632n/a self.expect_body()
633n/a else:
634n/a self.push_lit("440 Posting not permitted")
635n/a else:
636n/a assert self.allow_posting
637n/a self.push_lit("240 Article received OK")
638n/a self.posted_body = body
640n/a def handle_IHAVE(self, message_id, *, body=None):
641n/a if body is None:
642n/a if (self.allow_posting and
643n/a message_id == "<i.am.an.article.you.will.want@example.com>"):
644n/a self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
645n/a self.expect_body()
646n/a else:
647n/a self.push_lit("435 Article not wanted")
648n/a else:
649n/a assert self.allow_posting
650n/a self.push_lit("235 Article transferred OK")
651n/a self.posted_body = body
653n/a sample_head = """\
654n/a From: "Demo User" <nobody@example.net>
655n/a Subject: I am just a test article
656n/a Content-Type: text/plain; charset=UTF-8; format=flowed
657n/a Message-ID: <i.am.an.article.you.will.want@example.com>"""
659n/a sample_body = """\
660n/a This is just a test article.
661n/a ..Here is a dot-starting line.
663n/a -- Signed by Andr\xe9."""
665n/a sample_article = sample_head + "\n\n" + sample_body
667n/a def handle_ARTICLE(self, message_spec=None):
668n/a if message_spec is None:
669n/a self.push_lit("220 3000237 <45223423@example.com>")
670n/a elif message_spec == "<45223423@example.com>":
671n/a self.push_lit("220 0 <45223423@example.com>")
672n/a elif message_spec == "3000234":
673n/a self.push_lit("220 3000234 <45223423@example.com>")
674n/a else:
675n/a self.push_lit("430 No Such Article Found")
676n/a return
677n/a self.push_lit(self.sample_article)
678n/a self.push_lit(".")
680n/a def handle_HEAD(self, message_spec=None):
681n/a if message_spec is None:
682n/a self.push_lit("221 3000237 <45223423@example.com>")
683n/a elif message_spec == "<45223423@example.com>":
684n/a self.push_lit("221 0 <45223423@example.com>")
685n/a elif message_spec == "3000234":
686n/a self.push_lit("221 3000234 <45223423@example.com>")
687n/a else:
688n/a self.push_lit("430 No Such Article Found")
689n/a return
690n/a self.push_lit(self.sample_head)
691n/a self.push_lit(".")
693n/a def handle_BODY(self, message_spec=None):
694n/a if message_spec is None:
695n/a self.push_lit("222 3000237 <45223423@example.com>")
696n/a elif message_spec == "<45223423@example.com>":
697n/a self.push_lit("222 0 <45223423@example.com>")
698n/a elif message_spec == "3000234":
699n/a self.push_lit("222 3000234 <45223423@example.com>")
700n/a else:
701n/a self.push_lit("430 No Such Article Found")
702n/a return
703n/a self.push_lit(self.sample_body)
704n/a self.push_lit(".")
706n/a def handle_AUTHINFO(self, cred_type, data):
707n/a if self._logged_in:
708n/a self.push_lit('502 Already Logged In')
709n/a elif cred_type == 'user':
710n/a if self._user_sent:
711n/a self.push_lit('482 User Credential Already Sent')
712n/a else:
713n/a self.push_lit('381 Password Required')
714n/a self._user_sent = True
715n/a elif cred_type == 'pass':
716n/a self.push_lit('281 Login Successful')
717n/a self._logged_in = True
718n/a else:
719n/a raise Exception('Unknown cred type {}'.format(cred_type))
722n/aclass NNTPv2Handler(NNTPv1Handler):
723n/a """A handler for RFC 3977 (NNTP "v2")"""
725n/a def handle_CAPABILITIES(self):
726n/a fmt = """\
727n/a 101 Capability list:
728n/a VERSION 2 3
730n/a HDR
732n/a OVER
733n/a POST
734n/a READER
735n/a ."""
737n/a if not self._logged_in:
738n/a self.push_lit(fmt.format('\n AUTHINFO USER'))
739n/a else:
740n/a self.push_lit(fmt.format(''))
742n/a def handle_MODE(self, _):
743n/a raise Exception('MODE READER sent despite READER has been advertised')
745n/a def handle_OVER(self, message_spec=None):
746n/a return self.handle_XOVER(message_spec)
749n/aclass CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
750n/a """A handler that allows CAPABILITIES only after login"""
752n/a def handle_CAPABILITIES(self):
753n/a if not self._logged_in:
754n/a self.push_lit('480 You must log in.')
755n/a else:
756n/a super().handle_CAPABILITIES()
759n/aclass ModeSwitchingNNTPv2Handler(NNTPv2Handler):
760n/a """A server that starts in transit mode"""
762n/a def __init__(self):
763n/a self._switched = False
765n/a def handle_CAPABILITIES(self):
766n/a fmt = """\
767n/a 101 Capability list:
768n/a VERSION 2 3
770n/a HDR
772n/a OVER
773n/a POST
774n/a {}READER
775n/a ."""
776n/a if self._switched:
777n/a self.push_lit(fmt.format(''))
778n/a else:
779n/a self.push_lit(fmt.format('MODE-'))
781n/a def handle_MODE(self, what):
782n/a assert not self._switched and what == 'reader'
783n/a self._switched = True
784n/a self.push_lit('200 Posting allowed')
787n/aclass NNTPv1v2TestsMixin:
789n/a def setUp(self):
790n/a super().setUp()
792n/a def test_welcome(self):
793n/a self.assertEqual(self.server.welcome, self.handler.welcome)
795n/a def test_authinfo(self):
796n/a if self.nntp_version == 2:
797n/a self.assertIn('AUTHINFO', self.server._caps)
798n/a self.server.login('testuser', 'testpw')
799n/a # if AUTHINFO is gone from _caps we also know that getcapabilities()
800n/a # has been called after login as it should
801n/a self.assertNotIn('AUTHINFO', self.server._caps)
803n/a def test_date(self):
804n/a resp, date = self.server.date()
805n/a self.assertEqual(resp, "111 20100914001155")
806n/a self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
808n/a def test_quit(self):
809n/a self.assertFalse(self.sio.closed)
810n/a resp = self.server.quit()
811n/a self.assertEqual(resp, "205 Bye!")
812n/a self.assertTrue(self.sio.closed)
814n/a def test_help(self):
815n/a resp, help = self.server.help()
816n/a self.assertEqual(resp, "100 Legal commands")
817n/a self.assertEqual(help, [
818n/a ' authinfo user Name|pass Password|generic <prog> <args>',
819n/a ' date',
820n/a ' help',
821n/a 'Report problems to <root@example.org>',
822n/a ])
824n/a def test_list(self):
825n/a resp, groups = self.server.list()
826n/a self.assertEqual(len(groups), 6)
827n/a g = groups[1]
828n/a self.assertEqual(g,
829n/a GroupInfo("comp.lang.python.announce", "0000001153",
830n/a "0000000993", "m"))
831n/a resp, groups = self.server.list("*distutils*")
832n/a self.assertEqual(len(groups), 2)
833n/a g = groups[0]
834n/a self.assertEqual(g,
835n/a GroupInfo("gmane.comp.python.distutils.devel", "0000014104",
836n/a "0000000001", "m"))
838n/a def test_stat(self):
839n/a resp, art_num, message_id = self.server.stat(3000234)
840n/a self.assertEqual(resp, "223 3000234 <45223423@example.com>")
841n/a self.assertEqual(art_num, 3000234)
842n/a self.assertEqual(message_id, "<45223423@example.com>")
843n/a resp, art_num, message_id = self.server.stat("<45223423@example.com>")
844n/a self.assertEqual(resp, "223 0 <45223423@example.com>")
845n/a self.assertEqual(art_num, 0)
846n/a self.assertEqual(message_id, "<45223423@example.com>")
847n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
848n/a self.server.stat("<non.existent.id>")
849n/a self.assertEqual(cm.exception.response, "430 No Such Article Found")
850n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
851n/a self.server.stat()
852n/a self.assertEqual(cm.exception.response, "412 No newsgroup selected")
854n/a def test_next(self):
855n/a resp, art_num, message_id = self.server.next()
856n/a self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
857n/a self.assertEqual(art_num, 3000237)
858n/a self.assertEqual(message_id, "<668929@example.org>")
860n/a def test_last(self):
861n/a resp, art_num, message_id = self.server.last()
862n/a self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
863n/a self.assertEqual(art_num, 3000234)
864n/a self.assertEqual(message_id, "<45223423@example.com>")
866n/a def test_description(self):
867n/a desc = self.server.description("comp.lang.python")
868n/a self.assertEqual(desc, "The Python computer language.")
869n/a desc = self.server.description("comp.lang.pythonx")
870n/a self.assertEqual(desc, "")
872n/a def test_descriptions(self):
873n/a resp, groups = self.server.descriptions("comp.lang.python")
874n/a self.assertEqual(resp, '215 Descriptions in form "group description".')
875n/a self.assertEqual(groups, {
876n/a "comp.lang.python": "The Python computer language.",
877n/a })
878n/a resp, groups = self.server.descriptions("comp.lang.python*")
879n/a self.assertEqual(groups, {
880n/a "comp.lang.python": "The Python computer language.",
881n/a "comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
882n/a })
883n/a resp, groups = self.server.descriptions("comp.lang.pythonx")
884n/a self.assertEqual(groups, {})
886n/a def test_group(self):
887n/a resp, count, first, last, group = self.server.group("fr.comp.lang.python")
888n/a self.assertTrue(resp.startswith("211 "), resp)
889n/a self.assertEqual(first, 761)
890n/a self.assertEqual(last, 1265)
891n/a self.assertEqual(count, 486)
892n/a self.assertEqual(group, "fr.comp.lang.python")
893n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
894n/a self.server.group("comp.lang.python.devel")
895n/a exc = cm.exception
896n/a self.assertTrue(exc.response.startswith("411 No such group"),
897n/a exc.response)
899n/a def test_newnews(self):
900n/a # NEWNEWS comp.lang.python [20]100913 082004
901n/a dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
902n/a resp, ids = self.server.newnews("comp.lang.python", dt)
903n/a expected = (
904n/a "230 list of newsarticles (NNTP v{0}) "
905n/a "created after Mon Sep 13 08:20:04 2010 follows"
906n/a ).format(self.nntp_version)
907n/a self.assertEqual(resp, expected)
908n/a self.assertEqual(ids, [
909n/a "<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
910n/a "<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
911n/a ])
912n/a # NEWNEWS fr.comp.lang.python [20]100913 082004
913n/a dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
914n/a resp, ids = self.server.newnews("fr.comp.lang.python", dt)
915n/a self.assertEqual(resp, "230 An empty list of newsarticles follows")
916n/a self.assertEqual(ids, [])
918n/a def _check_article_body(self, lines):
919n/a self.assertEqual(len(lines), 4)
920n/a self.assertEqual(lines[-1].decode('utf-8'), "-- Signed by André.")
921n/a self.assertEqual(lines[-2], b"")
922n/a self.assertEqual(lines[-3], b".Here is a dot-starting line.")
923n/a self.assertEqual(lines[-4], b"This is just a test article.")
925n/a def _check_article_head(self, lines):
926n/a self.assertEqual(len(lines), 4)
927n/a self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
928n/a self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
930n/a def _check_article_data(self, lines):
931n/a self.assertEqual(len(lines), 9)
932n/a self._check_article_head(lines[:4])
933n/a self._check_article_body(lines[-4:])
934n/a self.assertEqual(lines[4], b"")
936n/a def test_article(self):
937n/a # ARTICLE
938n/a resp, info = self.server.article()
939n/a self.assertEqual(resp, "220 3000237 <45223423@example.com>")
940n/a art_num, message_id, lines = info
941n/a self.assertEqual(art_num, 3000237)
942n/a self.assertEqual(message_id, "<45223423@example.com>")
943n/a self._check_article_data(lines)
944n/a # ARTICLE num
945n/a resp, info = self.server.article(3000234)
946n/a self.assertEqual(resp, "220 3000234 <45223423@example.com>")
947n/a art_num, message_id, lines = info
948n/a self.assertEqual(art_num, 3000234)
949n/a self.assertEqual(message_id, "<45223423@example.com>")
950n/a self._check_article_data(lines)
951n/a # ARTICLE id
952n/a resp, info = self.server.article("<45223423@example.com>")
953n/a self.assertEqual(resp, "220 0 <45223423@example.com>")
954n/a art_num, message_id, lines = info
955n/a self.assertEqual(art_num, 0)
956n/a self.assertEqual(message_id, "<45223423@example.com>")
957n/a self._check_article_data(lines)
958n/a # Non-existent id
959n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
960n/a self.server.article("<non-existent@example.com>")
961n/a self.assertEqual(cm.exception.response, "430 No Such Article Found")
963n/a def test_article_file(self):
964n/a # With a "file" argument
965n/a f = io.BytesIO()
966n/a resp, info = self.server.article(file=f)
967n/a self.assertEqual(resp, "220 3000237 <45223423@example.com>")
968n/a art_num, message_id, lines = info
969n/a self.assertEqual(art_num, 3000237)
970n/a self.assertEqual(message_id, "<45223423@example.com>")
971n/a self.assertEqual(lines, [])
972n/a data = f.getvalue()
973n/a self.assertTrue(data.startswith(
974n/a b'From: "Demo User" <nobody@example.net>\r\n'
975n/a b'Subject: I am just a test article\r\n'
976n/a ), ascii(data))
977n/a self.assertTrue(data.endswith(
978n/a b'This is just a test article.\r\n'
979n/a b'.Here is a dot-starting line.\r\n'
980n/a b'\r\n'
981n/a b'-- Signed by Andr\xc3\xa9.\r\n'
982n/a ), ascii(data))
984n/a def test_head(self):
985n/a # HEAD
986n/a resp, info = self.server.head()
987n/a self.assertEqual(resp, "221 3000237 <45223423@example.com>")
988n/a art_num, message_id, lines = info
989n/a self.assertEqual(art_num, 3000237)
990n/a self.assertEqual(message_id, "<45223423@example.com>")
991n/a self._check_article_head(lines)
992n/a # HEAD num
993n/a resp, info = self.server.head(3000234)
994n/a self.assertEqual(resp, "221 3000234 <45223423@example.com>")
995n/a art_num, message_id, lines = info
996n/a self.assertEqual(art_num, 3000234)
997n/a self.assertEqual(message_id, "<45223423@example.com>")
998n/a self._check_article_head(lines)
999n/a # HEAD id
1000n/a resp, info = self.server.head("<45223423@example.com>")
1001n/a self.assertEqual(resp, "221 0 <45223423@example.com>")
1002n/a art_num, message_id, lines = info
1003n/a self.assertEqual(art_num, 0)
1004n/a self.assertEqual(message_id, "<45223423@example.com>")
1005n/a self._check_article_head(lines)
1006n/a # Non-existent id
1007n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1008n/a self.server.head("<non-existent@example.com>")
1009n/a self.assertEqual(cm.exception.response, "430 No Such Article Found")
1011n/a def test_head_file(self):
1012n/a f = io.BytesIO()
1013n/a resp, info = self.server.head(file=f)
1014n/a self.assertEqual(resp, "221 3000237 <45223423@example.com>")
1015n/a art_num, message_id, lines = info
1016n/a self.assertEqual(art_num, 3000237)
1017n/a self.assertEqual(message_id, "<45223423@example.com>")
1018n/a self.assertEqual(lines, [])
1019n/a data = f.getvalue()
1020n/a self.assertTrue(data.startswith(
1021n/a b'From: "Demo User" <nobody@example.net>\r\n'
1022n/a b'Subject: I am just a test article\r\n'
1023n/a ), ascii(data))
1024n/a self.assertFalse(data.endswith(
1025n/a b'This is just a test article.\r\n'
1026n/a b'.Here is a dot-starting line.\r\n'
1027n/a b'\r\n'
1028n/a b'-- Signed by Andr\xc3\xa9.\r\n'
1029n/a ), ascii(data))
1031n/a def test_body(self):
1032n/a # BODY
1033n/a resp, info = self.server.body()
1034n/a self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1035n/a art_num, message_id, lines = info
1036n/a self.assertEqual(art_num, 3000237)
1037n/a self.assertEqual(message_id, "<45223423@example.com>")
1038n/a self._check_article_body(lines)
1039n/a # BODY num
1040n/a resp, info = self.server.body(3000234)
1041n/a self.assertEqual(resp, "222 3000234 <45223423@example.com>")
1042n/a art_num, message_id, lines = info
1043n/a self.assertEqual(art_num, 3000234)
1044n/a self.assertEqual(message_id, "<45223423@example.com>")
1045n/a self._check_article_body(lines)
1046n/a # BODY id
1047n/a resp, info = self.server.body("<45223423@example.com>")
1048n/a self.assertEqual(resp, "222 0 <45223423@example.com>")
1049n/a art_num, message_id, lines = info
1050n/a self.assertEqual(art_num, 0)
1051n/a self.assertEqual(message_id, "<45223423@example.com>")
1052n/a self._check_article_body(lines)
1053n/a # Non-existent id
1054n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1055n/a self.server.body("<non-existent@example.com>")
1056n/a self.assertEqual(cm.exception.response, "430 No Such Article Found")
1058n/a def test_body_file(self):
1059n/a f = io.BytesIO()
1060n/a resp, info = self.server.body(file=f)
1061n/a self.assertEqual(resp, "222 3000237 <45223423@example.com>")
1062n/a art_num, message_id, lines = info
1063n/a self.assertEqual(art_num, 3000237)
1064n/a self.assertEqual(message_id, "<45223423@example.com>")
1065n/a self.assertEqual(lines, [])
1066n/a data = f.getvalue()
1067n/a self.assertFalse(data.startswith(
1068n/a b'From: "Demo User" <nobody@example.net>\r\n'
1069n/a b'Subject: I am just a test article\r\n'
1070n/a ), ascii(data))
1071n/a self.assertTrue(data.endswith(
1072n/a b'This is just a test article.\r\n'
1073n/a b'.Here is a dot-starting line.\r\n'
1074n/a b'\r\n'
1075n/a b'-- Signed by Andr\xc3\xa9.\r\n'
1076n/a ), ascii(data))
1078n/a def check_over_xover_resp(self, resp, overviews):
1079n/a self.assertTrue(resp.startswith("224 "), resp)
1080n/a self.assertEqual(len(overviews), 3)
1081n/a art_num, over = overviews[0]
1082n/a self.assertEqual(art_num, 57)
1083n/a self.assertEqual(over, {
1084n/a "from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
1085n/a "subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
1086n/a "date": "Sat, 19 Jun 2010 18:04:08 -0400",
1087n/a "message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
1088n/a "references": "<hvalf7$ort$1@dough.gmane.org>",
1089n/a ":bytes": "7103",
1090n/a ":lines": "16",
1091n/a "xref": "news.gmane.org gmane.comp.python.authors:57"
1092n/a })
1093n/a art_num, over = overviews[1]
1094n/a self.assertEqual(over["xref"], None)
1095n/a art_num, over = overviews[2]
1096n/a self.assertEqual(over["subject"],
1097n/a "Re: Message d'erreur incompréhensible (par moi)")
1099n/a def test_xover(self):
1100n/a resp, overviews = self.server.xover(57, 59)
1101n/a self.check_over_xover_resp(resp, overviews)
1103n/a def test_over(self):
1104n/a # In NNTP "v1", this will fallback on XOVER
1105n/a resp, overviews = self.server.over((57, 59))
1106n/a self.check_over_xover_resp(resp, overviews)
1108n/a sample_post = (
1109n/a b'From: "Demo User" <nobody@example.net>\r\n'
1110n/a b'Subject: I am just a test article\r\n'
1111n/a b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
1112n/a b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
1113n/a b'\r\n'
1114n/a b'This is just a test article.\r\n'
1115n/a b'.Here is a dot-starting line.\r\n'
1116n/a b'\r\n'
1117n/a b'-- Signed by Andr\xc3\xa9.\r\n'
1118n/a )
1120n/a def _check_posted_body(self):
1121n/a # Check the raw body as received by the server
1122n/a lines = self.handler.posted_body
1123n/a # One additional line for the "." terminator
1124n/a self.assertEqual(len(lines), 10)
1125n/a self.assertEqual(lines[-1], b'.\r\n')
1126n/a self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
1127n/a self.assertEqual(lines[-3], b'\r\n')
1128n/a self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
1129n/a self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
1131n/a def _check_post_ihave_sub(self, func, *args, file_factory):
1132n/a # First the prepared post with CRLF endings
1133n/a post = self.sample_post
1134n/a func_args = args + (file_factory(post),)
1135n/a self.handler.posted_body = None
1136n/a resp = func(*func_args)
1137n/a self._check_posted_body()
1138n/a # Then the same post with "normal" line endings - they should be
1139n/a # converted by NNTP.post and NNTP.ihave.
1140n/a post = self.sample_post.replace(b"\r\n", b"\n")
1141n/a func_args = args + (file_factory(post),)
1142n/a self.handler.posted_body = None
1143n/a resp = func(*func_args)
1144n/a self._check_posted_body()
1145n/a return resp
1147n/a def check_post_ihave(self, func, success_resp, *args):
1148n/a # With a bytes object
1149n/a resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
1150n/a self.assertEqual(resp, success_resp)
1151n/a # With a bytearray object
1152n/a resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
1153n/a self.assertEqual(resp, success_resp)
1154n/a # With a file object
1155n/a resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
1156n/a self.assertEqual(resp, success_resp)
1157n/a # With an iterable of terminated lines
1158n/a def iterlines(b):
1159n/a return iter(b.splitlines(keepends=True))
1160n/a resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1161n/a self.assertEqual(resp, success_resp)
1162n/a # With an iterable of non-terminated lines
1163n/a def iterlines(b):
1164n/a return iter(b.splitlines(keepends=False))
1165n/a resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
1166n/a self.assertEqual(resp, success_resp)
1168n/a def test_post(self):
1169n/a self.check_post_ihave(self.server.post, "240 Article received OK")
1170n/a self.handler.allow_posting = False
1171n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1172n/a self.server.post(self.sample_post)
1173n/a self.assertEqual(cm.exception.response,
1174n/a "440 Posting not permitted")
1176n/a def test_ihave(self):
1177n/a self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
1178n/a "<i.am.an.article.you.will.want@example.com>")
1179n/a with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
1180n/a self.server.ihave("<another.message.id>", self.sample_post)
1181n/a self.assertEqual(cm.exception.response,
1182n/a "435 Article not wanted")
1184n/a def test_too_long_lines(self):
1185n/a dt = datetime.datetime(2010, 1, 1, 9, 0, 0)
1186n/a self.assertRaises(nntplib.NNTPDataError,
1187n/a self.server.newnews, "comp.lang.python", dt)
1190n/aclass NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1191n/a """Tests an NNTP v1 server (no capabilities)."""
1193n/a nntp_version = 1
1194n/a handler_class = NNTPv1Handler
1196n/a def test_caps(self):
1197n/a caps = self.server.getcapabilities()
1198n/a self.assertEqual(caps, {})
1199n/a self.assertEqual(self.server.nntp_version, 1)
1200n/a self.assertEqual(self.server.nntp_implementation, None)
1203n/aclass NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
1204n/a """Tests an NNTP v2 server (with capabilities)."""
1206n/a nntp_version = 2
1207n/a handler_class = NNTPv2Handler
1209n/a def test_caps(self):
1210n/a caps = self.server.getcapabilities()
1211n/a self.assertEqual(caps, {
1212n/a 'VERSION': ['2', '3'],
1213n/a 'IMPLEMENTATION': ['INN', '2.5.1'],
1214n/a 'AUTHINFO': ['USER'],
1215n/a 'HDR': [],
1218n/a 'OVER': [],
1219n/a 'POST': [],
1220n/a 'READER': [],
1221n/a })
1222n/a self.assertEqual(self.server.nntp_version, 3)
1223n/a self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
1226n/aclass CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
1227n/a """Tests a probably NNTP v2 server with capabilities only after login."""
1229n/a nntp_version = 2
1230n/a handler_class = CapsAfterLoginNNTPv2Handler
1232n/a def test_caps_only_after_login(self):
1233n/a self.assertEqual(self.server._caps, {})
1234n/a self.server.login('testuser', 'testpw')
1235n/a self.assertIn('VERSION', self.server._caps)
1238n/aclass SendReaderNNTPv2Tests(MockedNNTPWithReaderModeMixin,
1239n/a unittest.TestCase):
1240n/a """Same tests as for v2 but we tell NTTP to send MODE READER to a server
1241n/a that isn't in READER mode by default."""
1243n/a nntp_version = 2
1244n/a handler_class = ModeSwitchingNNTPv2Handler
1246n/a def test_we_are_in_reader_mode_after_connect(self):
1247n/a self.assertIn('READER', self.server._caps)
1250n/aclass MiscTests(unittest.TestCase):
1252n/a def test_decode_header(self):
1253n/a def gives(a, b):
1254n/a self.assertEqual(nntplib.decode_header(a), b)
1255n/a gives("" , "")
1256n/a gives("a plain header", "a plain header")
1257n/a gives(" with extra spaces ", " with extra spaces ")
1258n/a gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
1259n/a gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
1260n/a " =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
1261n/a "Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
1262n/a gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
1263n/a "Re: problème de matrice")
1264n/a # A natively utf-8 header (found in the real world!)
1265n/a gives("Re: Message d'erreur incompréhensible (par moi)",
1266n/a "Re: Message d'erreur incompréhensible (par moi)")
1268n/a def test_parse_overview_fmt(self):
1269n/a # The minimal (default) response
1270n/a lines = ["Subject:", "From:", "Date:", "Message-ID:",
1271n/a "References:", ":bytes", ":lines"]
1272n/a self.assertEqual(nntplib._parse_overview_fmt(lines),
1273n/a ["subject", "from", "date", "message-id", "references",
1274n/a ":bytes", ":lines"])
1275n/a # The minimal response using alternative names
1276n/a lines = ["Subject:", "From:", "Date:", "Message-ID:",
1277n/a "References:", "Bytes:", "Lines:"]
1278n/a self.assertEqual(nntplib._parse_overview_fmt(lines),
1279n/a ["subject", "from", "date", "message-id", "references",
1280n/a ":bytes", ":lines"])
1281n/a # Variations in casing
1282n/a lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
1283n/a "References:", "BYTES:", "Lines:"]
1284n/a self.assertEqual(nntplib._parse_overview_fmt(lines),
1285n/a ["subject", "from", "date", "message-id", "references",
1286n/a ":bytes", ":lines"])
1287n/a # First example from RFC 3977
1288n/a lines = ["Subject:", "From:", "Date:", "Message-ID:",
1289n/a "References:", ":bytes", ":lines", "Xref:full",
1290n/a "Distribution:full"]
1291n/a self.assertEqual(nntplib._parse_overview_fmt(lines),
1292n/a ["subject", "from", "date", "message-id", "references",
1293n/a ":bytes", ":lines", "xref", "distribution"])
1294n/a # Second example from RFC 3977
1295n/a lines = ["Subject:", "From:", "Date:", "Message-ID:",
1296n/a "References:", "Bytes:", "Lines:", "Xref:FULL",
1297n/a "Distribution:FULL"]
1298n/a self.assertEqual(nntplib._parse_overview_fmt(lines),
1299n/a ["subject", "from", "date", "message-id", "references",
1300n/a ":bytes", ":lines", "xref", "distribution"])
1301n/a # A classic response from INN
1302n/a lines = ["Subject:", "From:", "Date:", "Message-ID:",
1303n/a "References:", "Bytes:", "Lines:", "Xref:full"]
1304n/a self.assertEqual(nntplib._parse_overview_fmt(lines),
1305n/a ["subject", "from", "date", "message-id", "references",
1306n/a ":bytes", ":lines", "xref"])
1308n/a def test_parse_overview(self):
1309n/a fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
1310n/a # First example from RFC 3977
1311n/a lines = [
1312n/a '3000234\tI am just a test article\t"Demo User" '
1313n/a '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1314n/a '<45223423@example.com>\t<45454@example.net>\t1234\t'
1315n/a '17\tXref: news.example.com misc.test:3000363',
1316n/a ]
1317n/a overview = nntplib._parse_overview(lines, fmt)
1318n/a (art_num, fields), = overview
1319n/a self.assertEqual(art_num, 3000234)
1320n/a self.assertEqual(fields, {
1321n/a 'subject': 'I am just a test article',
1322n/a 'from': '"Demo User" <nobody@example.com>',
1323n/a 'date': '6 Oct 1998 04:38:40 -0500',
1324n/a 'message-id': '<45223423@example.com>',
1325n/a 'references': '<45454@example.net>',
1326n/a ':bytes': '1234',
1327n/a ':lines': '17',
1328n/a 'xref': 'news.example.com misc.test:3000363',
1329n/a })
1330n/a # Second example; here the "Xref" field is totally absent (including
1331n/a # the header name) and comes out as None
1332n/a lines = [
1333n/a '3000234\tI am just a test article\t"Demo User" '
1334n/a '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1335n/a '<45223423@example.com>\t<45454@example.net>\t1234\t'
1336n/a '17\t\t',
1337n/a ]
1338n/a overview = nntplib._parse_overview(lines, fmt)
1339n/a (art_num, fields), = overview
1340n/a self.assertEqual(fields['xref'], None)
1341n/a # Third example; the "Xref" is an empty string, while "references"
1342n/a # is a single space.
1343n/a lines = [
1344n/a '3000234\tI am just a test article\t"Demo User" '
1345n/a '<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
1346n/a '<45223423@example.com>\t \t1234\t'
1347n/a '17\tXref: \t',
1348n/a ]
1349n/a overview = nntplib._parse_overview(lines, fmt)
1350n/a (art_num, fields), = overview
1351n/a self.assertEqual(fields['references'], ' ')
1352n/a self.assertEqual(fields['xref'], '')
1354n/a def test_parse_datetime(self):
1355n/a def gives(a, b, *c):
1356n/a self.assertEqual(nntplib._parse_datetime(a, b),
1357n/a datetime.datetime(*c))
1358n/a # Output of DATE command
1359n/a gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
1360n/a # Variations
1361n/a gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
1362n/a gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
1363n/a gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
1365n/a def test_unparse_datetime(self):
1366n/a # Test non-legacy mode
1367n/a # 1) with a datetime
1368n/a def gives(y, M, d, h, m, s, date_str, time_str):
1369n/a dt = datetime.datetime(y, M, d, h, m, s)
1370n/a self.assertEqual(nntplib._unparse_datetime(dt),
1371n/a (date_str, time_str))
1372n/a self.assertEqual(nntplib._unparse_datetime(dt, False),
1373n/a (date_str, time_str))
1374n/a gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
1375n/a gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
1376n/a gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
1377n/a # 2) with a date
1378n/a def gives(y, M, d, date_str, time_str):
1379n/a dt = datetime.date(y, M, d)
1380n/a self.assertEqual(nntplib._unparse_datetime(dt),
1381n/a (date_str, time_str))
1382n/a self.assertEqual(nntplib._unparse_datetime(dt, False),
1383n/a (date_str, time_str))
1384n/a gives(1999, 6, 23, "19990623", "000000")
1385n/a gives(2000, 6, 23, "20000623", "000000")
1386n/a gives(2010, 6, 5, "20100605", "000000")
1388n/a def test_unparse_datetime_legacy(self):
1389n/a # Test legacy mode (RFC 977)
1390n/a # 1) with a datetime
1391n/a def gives(y, M, d, h, m, s, date_str, time_str):
1392n/a dt = datetime.datetime(y, M, d, h, m, s)
1393n/a self.assertEqual(nntplib._unparse_datetime(dt, True),
1394n/a (date_str, time_str))
1395n/a gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
1396n/a gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
1397n/a gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
1398n/a # 2) with a date
1399n/a def gives(y, M, d, date_str, time_str):
1400n/a dt = datetime.date(y, M, d)
1401n/a self.assertEqual(nntplib._unparse_datetime(dt, True),
1402n/a (date_str, time_str))
1403n/a gives(1999, 6, 23, "990623", "000000")
1404n/a gives(2000, 6, 23, "000623", "000000")
1405n/a gives(2010, 6, 5, "100605", "000000")
1407n/a @unittest.skipUnless(ssl, 'requires SSL support')
1408n/a def test_ssl_support(self):
1409n/a self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
1412n/aclass PublicAPITests(unittest.TestCase):
1413n/a """Ensures that the correct values are exposed in the public API."""
1415n/a def test_module_all_attribute(self):
1416n/a self.assertTrue(hasattr(nntplib, '__all__'))
1417n/a target_api = ['NNTP', 'NNTPError', 'NNTPReplyError',
1418n/a 'NNTPTemporaryError', 'NNTPPermanentError',
1419n/a 'NNTPProtocolError', 'NNTPDataError', 'decode_header']
1420n/a if ssl is not None:
1421n/a target_api.append('NNTP_SSL')
1422n/a self.assertEqual(set(nntplib.__all__), set(target_api))
1424n/aclass MockSocketTests(unittest.TestCase):
1425n/a """Tests involving a mock socket object
1427n/a Used where the _NNTPServerIO file object is not enough."""
1429n/a nntp_class = nntplib.NNTP
1431n/a def check_constructor_error_conditions(
1432n/a self, handler_class,
1433n/a expected_error_type, expected_error_msg,
1434n/a login=None, password=None):
1436n/a class mock_socket_module:
1437n/a def create_connection(address, timeout):
1438n/a return MockSocket()
1440n/a class MockSocket:
1441n/a def close(self):
1442n/a nonlocal socket_closed
1443n/a socket_closed = True
1445n/a def makefile(socket, mode):
1446n/a handler = handler_class()
1447n/a _, file = make_mock_file(handler)
1448n/a files.append(file)
1449n/a return file
1451n/a socket_closed = False
1452n/a files = []
1453n/a with patch('nntplib.socket', mock_socket_module), \
1454n/a self.assertRaisesRegex(expected_error_type, expected_error_msg):
1455n/a self.nntp_class('dummy', user=login, password=password)
1456n/a self.assertTrue(socket_closed)
1457n/a for f in files:
1458n/a self.assertTrue(f.closed)
1460n/a def test_bad_welcome(self):
1461n/a #Test a bad welcome message
1462n/a class Handler(NNTPv1Handler):
1463n/a welcome = 'Bad Welcome'
1464n/a self.check_constructor_error_conditions(
1465n/a Handler, nntplib.NNTPProtocolError, Handler.welcome)
1467n/a def test_service_temporarily_unavailable(self):
1468n/a #Test service temporarily unavailable
1469n/a class Handler(NNTPv1Handler):
1470n/a welcome = '400 Service temporarily unavailable'
1471n/a self.check_constructor_error_conditions(
1472n/a Handler, nntplib.NNTPTemporaryError, Handler.welcome)
1474n/a def test_service_permanently_unavailable(self):
1475n/a #Test service permanently unavailable
1476n/a class Handler(NNTPv1Handler):
1477n/a welcome = '502 Service permanently unavailable'
1478n/a self.check_constructor_error_conditions(
1479n/a Handler, nntplib.NNTPPermanentError, Handler.welcome)
1481n/a def test_bad_capabilities(self):
1482n/a #Test a bad capabilities response
1483n/a class Handler(NNTPv1Handler):
1484n/a def handle_CAPABILITIES(self):
1485n/a self.push_lit(capabilities_response)
1486n/a capabilities_response = '201 bad capability'
1487n/a self.check_constructor_error_conditions(
1488n/a Handler, nntplib.NNTPReplyError, capabilities_response)
1490n/a def test_login_aborted(self):
1491n/a #Test a bad authinfo response
1492n/a login = 't@e.com'
1493n/a password = 'python'
1494n/a class Handler(NNTPv1Handler):
1495n/a def handle_AUTHINFO(self, *args):
1496n/a self.push_lit(authinfo_response)
1497n/a authinfo_response = '503 Mechanism not recognized'
1498n/a self.check_constructor_error_conditions(
1499n/a Handler, nntplib.NNTPPermanentError, authinfo_response,
1500n/a login, password)
1502n/aclass bypass_context:
1503n/a """Bypass encryption and actual SSL module"""
1504n/a def wrap_socket(sock, **args):
1505n/a return sock
1507n/a@unittest.skipUnless(ssl, 'requires SSL support')
1508n/aclass MockSslTests(MockSocketTests):
1509n/a @staticmethod
1510n/a def nntp_class(*pos, **kw):
1511n/a return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
1513n/a@unittest.skipUnless(threading, 'requires multithreading')
1514n/aclass LocalServerTests(unittest.TestCase):
1515n/a def setUp(self):
1516n/a sock = socket.socket()
1517n/a port = support.bind_port(sock)
1518n/a sock.listen()
1519n/a self.background = threading.Thread(
1520n/a target=self.run_server, args=(sock,))
1521n/a self.background.start()
1522n/a self.addCleanup(self.background.join)
1524n/a self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__()
1525n/a self.addCleanup(self.nntp.__exit__, None, None, None)
1527n/a def run_server(self, sock):
1528n/a # Could be generalized to handle more commands in separate methods
1529n/a with sock:
1530n/a [client, _] = sock.accept()
1531n/a with contextlib.ExitStack() as cleanup:
1532n/a cleanup.enter_context(client)
1533n/a reader = cleanup.enter_context(client.makefile('rb'))
1534n/a client.sendall(b'200 Server ready\r\n')
1535n/a while True:
1536n/a cmd = reader.readline()
1537n/a if cmd == b'CAPABILITIES\r\n':
1538n/a client.sendall(
1539n/a b'101 Capability list:\r\n'
1540n/a b'VERSION 2\r\n'
1541n/a b'STARTTLS\r\n'
1542n/a b'.\r\n'
1543n/a )
1544n/a elif cmd == b'STARTTLS\r\n':
1545n/a reader.close()
1546n/a client.sendall(b'382 Begin TLS negotiation now\r\n')
1547n/a context = ssl.SSLContext()
1548n/a context.load_cert_chain(certfile)
1549n/a client = context.wrap_socket(
1550n/a client, server_side=True)
1551n/a cleanup.enter_context(client)
1552n/a reader = cleanup.enter_context(client.makefile('rb'))
1553n/a elif cmd == b'QUIT\r\n':
1554n/a client.sendall(b'205 Bye!\r\n')
1555n/a break
1556n/a else:
1557n/a raise ValueError('Unexpected command {!r}'.format(cmd))
1559n/a @unittest.skipUnless(ssl, 'requires SSL support')
1560n/a def test_starttls(self):
1561n/a file = self.nntp.file
1562n/a sock = self.nntp.sock
1563n/a self.nntp.starttls()
1564n/a # Check that the socket and internal pseudo-file really were
1565n/a # changed.
1566n/a self.assertNotEqual(file, self.nntp.file)
1567n/a self.assertNotEqual(sock, self.nntp.sock)
1568n/a # Check that the new socket really is an SSL one
1569n/a self.assertIsInstance(self.nntp.sock, ssl.SSLSocket)
1570n/a # Check that trying starttls when it's already active fails.
1571n/a self.assertRaises(ValueError, self.nntp.starttls)
1574n/aif __name__ == "__main__":
1575n/a unittest.main()