ยปCore Development>Code coverage>Lib/test/test_logging.py

Python code coverage for Lib/test/test_logging.py

#countcontent
1n/a# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved.
2n/a#
3n/a# Permission to use, copy, modify, and distribute this software and its
4n/a# documentation for any purpose and without fee is hereby granted,
5n/a# provided that the above copyright notice appear in all copies and that
6n/a# both that copyright notice and this permission notice appear in
7n/a# supporting documentation, and that the name of Vinay Sajip
8n/a# not be used in advertising or publicity pertaining to distribution
9n/a# of the software without specific, written prior permission.
10n/a# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11n/a# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12n/a# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13n/a# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14n/a# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15n/a# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16n/a
17n/a"""Test harness for the logging module. Run all tests.
18n/a
19n/aCopyright (C) 2001-2017 Vinay Sajip. All Rights Reserved.
20n/a"""
21n/a
22n/aimport logging
23n/aimport logging.handlers
24n/aimport logging.config
25n/a
26n/aimport codecs
27n/aimport configparser
28n/aimport datetime
29n/aimport pathlib
30n/aimport pickle
31n/aimport io
32n/aimport gc
33n/aimport json
34n/aimport os
35n/aimport queue
36n/aimport random
37n/aimport re
38n/aimport socket
39n/aimport struct
40n/aimport sys
41n/aimport tempfile
42n/afrom test.support.script_helper import assert_python_ok
43n/afrom test import support
44n/aimport textwrap
45n/aimport time
46n/aimport unittest
47n/aimport warnings
48n/aimport weakref
49n/atry:
50n/a import threading
51n/a # The following imports are needed only for tests which
52n/a # require threading
53n/a import asyncore
54n/a from http.server import HTTPServer, BaseHTTPRequestHandler
55n/a import smtpd
56n/a from urllib.parse import urlparse, parse_qs
57n/a from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
58n/a ThreadingTCPServer, StreamRequestHandler)
59n/aexcept ImportError:
60n/a threading = None
61n/atry:
62n/a import win32evtlog, win32evtlogutil, pywintypes
63n/aexcept ImportError:
64n/a win32evtlog = win32evtlogutil = pywintypes = None
65n/a
66n/atry:
67n/a import zlib
68n/aexcept ImportError:
69n/a pass
70n/a
71n/aclass BaseTest(unittest.TestCase):
72n/a
73n/a """Base class for logging tests."""
74n/a
75n/a log_format = "%(name)s -> %(levelname)s: %(message)s"
76n/a expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
77n/a message_num = 0
78n/a
79n/a def setUp(self):
80n/a """Setup the default logging stream to an internal StringIO instance,
81n/a so that we can examine log output as we want."""
82n/a logger_dict = logging.getLogger().manager.loggerDict
83n/a logging._acquireLock()
84n/a try:
85n/a self.saved_handlers = logging._handlers.copy()
86n/a self.saved_handler_list = logging._handlerList[:]
87n/a self.saved_loggers = saved_loggers = logger_dict.copy()
88n/a self.saved_name_to_level = logging._nameToLevel.copy()
89n/a self.saved_level_to_name = logging._levelToName.copy()
90n/a self.logger_states = logger_states = {}
91n/a for name in saved_loggers:
92n/a logger_states[name] = getattr(saved_loggers[name],
93n/a 'disabled', None)
94n/a finally:
95n/a logging._releaseLock()
96n/a
97n/a # Set two unused loggers
98n/a self.logger1 = logging.getLogger("\xab\xd7\xbb")
99n/a self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
100n/a
101n/a self.root_logger = logging.getLogger("")
102n/a self.original_logging_level = self.root_logger.getEffectiveLevel()
103n/a
104n/a self.stream = io.StringIO()
105n/a self.root_logger.setLevel(logging.DEBUG)
106n/a self.root_hdlr = logging.StreamHandler(self.stream)
107n/a self.root_formatter = logging.Formatter(self.log_format)
108n/a self.root_hdlr.setFormatter(self.root_formatter)
109n/a if self.logger1.hasHandlers():
110n/a hlist = self.logger1.handlers + self.root_logger.handlers
111n/a raise AssertionError('Unexpected handlers: %s' % hlist)
112n/a if self.logger2.hasHandlers():
113n/a hlist = self.logger2.handlers + self.root_logger.handlers
114n/a raise AssertionError('Unexpected handlers: %s' % hlist)
115n/a self.root_logger.addHandler(self.root_hdlr)
116n/a self.assertTrue(self.logger1.hasHandlers())
117n/a self.assertTrue(self.logger2.hasHandlers())
118n/a
119n/a def tearDown(self):
120n/a """Remove our logging stream, and restore the original logging
121n/a level."""
122n/a self.stream.close()
123n/a self.root_logger.removeHandler(self.root_hdlr)
124n/a while self.root_logger.handlers:
125n/a h = self.root_logger.handlers[0]
126n/a self.root_logger.removeHandler(h)
127n/a h.close()
128n/a self.root_logger.setLevel(self.original_logging_level)
129n/a logging._acquireLock()
130n/a try:
131n/a logging._levelToName.clear()
132n/a logging._levelToName.update(self.saved_level_to_name)
133n/a logging._nameToLevel.clear()
134n/a logging._nameToLevel.update(self.saved_name_to_level)
135n/a logging._handlers.clear()
136n/a logging._handlers.update(self.saved_handlers)
137n/a logging._handlerList[:] = self.saved_handler_list
138n/a loggerDict = logging.getLogger().manager.loggerDict
139n/a loggerDict.clear()
140n/a loggerDict.update(self.saved_loggers)
141n/a logger_states = self.logger_states
142n/a for name in self.logger_states:
143n/a if logger_states[name] is not None:
144n/a self.saved_loggers[name].disabled = logger_states[name]
145n/a finally:
146n/a logging._releaseLock()
147n/a
148n/a def assert_log_lines(self, expected_values, stream=None, pat=None):
149n/a """Match the collected log lines against the regular expression
150n/a self.expected_log_pat, and compare the extracted group values to
151n/a the expected_values list of tuples."""
152n/a stream = stream or self.stream
153n/a pat = re.compile(pat or self.expected_log_pat)
154n/a actual_lines = stream.getvalue().splitlines()
155n/a self.assertEqual(len(actual_lines), len(expected_values))
156n/a for actual, expected in zip(actual_lines, expected_values):
157n/a match = pat.search(actual)
158n/a if not match:
159n/a self.fail("Log line does not match expected pattern:\n" +
160n/a actual)
161n/a self.assertEqual(tuple(match.groups()), expected)
162n/a s = stream.read()
163n/a if s:
164n/a self.fail("Remaining output at end of log stream:\n" + s)
165n/a
166n/a def next_message(self):
167n/a """Generate a message consisting solely of an auto-incrementing
168n/a integer."""
169n/a self.message_num += 1
170n/a return "%d" % self.message_num
171n/a
172n/a
173n/aclass BuiltinLevelsTest(BaseTest):
174n/a """Test builtin levels and their inheritance."""
175n/a
176n/a def test_flat(self):
177n/a #Logging levels in a flat logger namespace.
178n/a m = self.next_message
179n/a
180n/a ERR = logging.getLogger("ERR")
181n/a ERR.setLevel(logging.ERROR)
182n/a INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
183n/a INF.setLevel(logging.INFO)
184n/a DEB = logging.getLogger("DEB")
185n/a DEB.setLevel(logging.DEBUG)
186n/a
187n/a # These should log.
188n/a ERR.log(logging.CRITICAL, m())
189n/a ERR.error(m())
190n/a
191n/a INF.log(logging.CRITICAL, m())
192n/a INF.error(m())
193n/a INF.warning(m())
194n/a INF.info(m())
195n/a
196n/a DEB.log(logging.CRITICAL, m())
197n/a DEB.error(m())
198n/a DEB.warning(m())
199n/a DEB.info(m())
200n/a DEB.debug(m())
201n/a
202n/a # These should not log.
203n/a ERR.warning(m())
204n/a ERR.info(m())
205n/a ERR.debug(m())
206n/a
207n/a INF.debug(m())
208n/a
209n/a self.assert_log_lines([
210n/a ('ERR', 'CRITICAL', '1'),
211n/a ('ERR', 'ERROR', '2'),
212n/a ('INF', 'CRITICAL', '3'),
213n/a ('INF', 'ERROR', '4'),
214n/a ('INF', 'WARNING', '5'),
215n/a ('INF', 'INFO', '6'),
216n/a ('DEB', 'CRITICAL', '7'),
217n/a ('DEB', 'ERROR', '8'),
218n/a ('DEB', 'WARNING', '9'),
219n/a ('DEB', 'INFO', '10'),
220n/a ('DEB', 'DEBUG', '11'),
221n/a ])
222n/a
223n/a def test_nested_explicit(self):
224n/a # Logging levels in a nested namespace, all explicitly set.
225n/a m = self.next_message
226n/a
227n/a INF = logging.getLogger("INF")
228n/a INF.setLevel(logging.INFO)
229n/a INF_ERR = logging.getLogger("INF.ERR")
230n/a INF_ERR.setLevel(logging.ERROR)
231n/a
232n/a # These should log.
233n/a INF_ERR.log(logging.CRITICAL, m())
234n/a INF_ERR.error(m())
235n/a
236n/a # These should not log.
237n/a INF_ERR.warning(m())
238n/a INF_ERR.info(m())
239n/a INF_ERR.debug(m())
240n/a
241n/a self.assert_log_lines([
242n/a ('INF.ERR', 'CRITICAL', '1'),
243n/a ('INF.ERR', 'ERROR', '2'),
244n/a ])
245n/a
246n/a def test_nested_inherited(self):
247n/a #Logging levels in a nested namespace, inherited from parent loggers.
248n/a m = self.next_message
249n/a
250n/a INF = logging.getLogger("INF")
251n/a INF.setLevel(logging.INFO)
252n/a INF_ERR = logging.getLogger("INF.ERR")
253n/a INF_ERR.setLevel(logging.ERROR)
254n/a INF_UNDEF = logging.getLogger("INF.UNDEF")
255n/a INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
256n/a UNDEF = logging.getLogger("UNDEF")
257n/a
258n/a # These should log.
259n/a INF_UNDEF.log(logging.CRITICAL, m())
260n/a INF_UNDEF.error(m())
261n/a INF_UNDEF.warning(m())
262n/a INF_UNDEF.info(m())
263n/a INF_ERR_UNDEF.log(logging.CRITICAL, m())
264n/a INF_ERR_UNDEF.error(m())
265n/a
266n/a # These should not log.
267n/a INF_UNDEF.debug(m())
268n/a INF_ERR_UNDEF.warning(m())
269n/a INF_ERR_UNDEF.info(m())
270n/a INF_ERR_UNDEF.debug(m())
271n/a
272n/a self.assert_log_lines([
273n/a ('INF.UNDEF', 'CRITICAL', '1'),
274n/a ('INF.UNDEF', 'ERROR', '2'),
275n/a ('INF.UNDEF', 'WARNING', '3'),
276n/a ('INF.UNDEF', 'INFO', '4'),
277n/a ('INF.ERR.UNDEF', 'CRITICAL', '5'),
278n/a ('INF.ERR.UNDEF', 'ERROR', '6'),
279n/a ])
280n/a
281n/a def test_nested_with_virtual_parent(self):
282n/a # Logging levels when some parent does not exist yet.
283n/a m = self.next_message
284n/a
285n/a INF = logging.getLogger("INF")
286n/a GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
287n/a CHILD = logging.getLogger("INF.BADPARENT")
288n/a INF.setLevel(logging.INFO)
289n/a
290n/a # These should log.
291n/a GRANDCHILD.log(logging.FATAL, m())
292n/a GRANDCHILD.info(m())
293n/a CHILD.log(logging.FATAL, m())
294n/a CHILD.info(m())
295n/a
296n/a # These should not log.
297n/a GRANDCHILD.debug(m())
298n/a CHILD.debug(m())
299n/a
300n/a self.assert_log_lines([
301n/a ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
302n/a ('INF.BADPARENT.UNDEF', 'INFO', '2'),
303n/a ('INF.BADPARENT', 'CRITICAL', '3'),
304n/a ('INF.BADPARENT', 'INFO', '4'),
305n/a ])
306n/a
307n/a def test_regression_22386(self):
308n/a """See issue #22386 for more information."""
309n/a self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
310n/a self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
311n/a
312n/a def test_regression_29220(self):
313n/a """See issue #29220 for more information."""
314n/a logging.addLevelName(logging.INFO, '')
315n/a self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
316n/a self.assertEqual(logging.getLevelName(logging.INFO), '')
317n/a
318n/a def test_issue27935(self):
319n/a fatal = logging.getLevelName('FATAL')
320n/a self.assertEqual(fatal, logging.FATAL)
321n/a
322n/a def test_regression_29220(self):
323n/a """See issue #29220 for more information."""
324n/a logging.addLevelName(logging.INFO, '')
325n/a self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
326n/a self.assertEqual(logging.getLevelName(logging.INFO), '')
327n/a self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
328n/a self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
329n/a
330n/aclass BasicFilterTest(BaseTest):
331n/a
332n/a """Test the bundled Filter class."""
333n/a
334n/a def test_filter(self):
335n/a # Only messages satisfying the specified criteria pass through the
336n/a # filter.
337n/a filter_ = logging.Filter("spam.eggs")
338n/a handler = self.root_logger.handlers[0]
339n/a try:
340n/a handler.addFilter(filter_)
341n/a spam = logging.getLogger("spam")
342n/a spam_eggs = logging.getLogger("spam.eggs")
343n/a spam_eggs_fish = logging.getLogger("spam.eggs.fish")
344n/a spam_bakedbeans = logging.getLogger("spam.bakedbeans")
345n/a
346n/a spam.info(self.next_message())
347n/a spam_eggs.info(self.next_message()) # Good.
348n/a spam_eggs_fish.info(self.next_message()) # Good.
349n/a spam_bakedbeans.info(self.next_message())
350n/a
351n/a self.assert_log_lines([
352n/a ('spam.eggs', 'INFO', '2'),
353n/a ('spam.eggs.fish', 'INFO', '3'),
354n/a ])
355n/a finally:
356n/a handler.removeFilter(filter_)
357n/a
358n/a def test_callable_filter(self):
359n/a # Only messages satisfying the specified criteria pass through the
360n/a # filter.
361n/a
362n/a def filterfunc(record):
363n/a parts = record.name.split('.')
364n/a prefix = '.'.join(parts[:2])
365n/a return prefix == 'spam.eggs'
366n/a
367n/a handler = self.root_logger.handlers[0]
368n/a try:
369n/a handler.addFilter(filterfunc)
370n/a spam = logging.getLogger("spam")
371n/a spam_eggs = logging.getLogger("spam.eggs")
372n/a spam_eggs_fish = logging.getLogger("spam.eggs.fish")
373n/a spam_bakedbeans = logging.getLogger("spam.bakedbeans")
374n/a
375n/a spam.info(self.next_message())
376n/a spam_eggs.info(self.next_message()) # Good.
377n/a spam_eggs_fish.info(self.next_message()) # Good.
378n/a spam_bakedbeans.info(self.next_message())
379n/a
380n/a self.assert_log_lines([
381n/a ('spam.eggs', 'INFO', '2'),
382n/a ('spam.eggs.fish', 'INFO', '3'),
383n/a ])
384n/a finally:
385n/a handler.removeFilter(filterfunc)
386n/a
387n/a def test_empty_filter(self):
388n/a f = logging.Filter()
389n/a r = logging.makeLogRecord({'name': 'spam.eggs'})
390n/a self.assertTrue(f.filter(r))
391n/a
392n/a#
393n/a# First, we define our levels. There can be as many as you want - the only
394n/a# limitations are that they should be integers, the lowest should be > 0 and
395n/a# larger values mean less information being logged. If you need specific
396n/a# level values which do not fit into these limitations, you can use a
397n/a# mapping dictionary to convert between your application levels and the
398n/a# logging system.
399n/a#
400n/aSILENT = 120
401n/aTACITURN = 119
402n/aTERSE = 118
403n/aEFFUSIVE = 117
404n/aSOCIABLE = 116
405n/aVERBOSE = 115
406n/aTALKATIVE = 114
407n/aGARRULOUS = 113
408n/aCHATTERBOX = 112
409n/aBORING = 111
410n/a
411n/aLEVEL_RANGE = range(BORING, SILENT + 1)
412n/a
413n/a#
414n/a# Next, we define names for our levels. You don't need to do this - in which
415n/a# case the system will use "Level n" to denote the text for the level.
416n/a#
417n/amy_logging_levels = {
418n/a SILENT : 'Silent',
419n/a TACITURN : 'Taciturn',
420n/a TERSE : 'Terse',
421n/a EFFUSIVE : 'Effusive',
422n/a SOCIABLE : 'Sociable',
423n/a VERBOSE : 'Verbose',
424n/a TALKATIVE : 'Talkative',
425n/a GARRULOUS : 'Garrulous',
426n/a CHATTERBOX : 'Chatterbox',
427n/a BORING : 'Boring',
428n/a}
429n/a
430n/aclass GarrulousFilter(logging.Filter):
431n/a
432n/a """A filter which blocks garrulous messages."""
433n/a
434n/a def filter(self, record):
435n/a return record.levelno != GARRULOUS
436n/a
437n/aclass VerySpecificFilter(logging.Filter):
438n/a
439n/a """A filter which blocks sociable and taciturn messages."""
440n/a
441n/a def filter(self, record):
442n/a return record.levelno not in [SOCIABLE, TACITURN]
443n/a
444n/a
445n/aclass CustomLevelsAndFiltersTest(BaseTest):
446n/a
447n/a """Test various filtering possibilities with custom logging levels."""
448n/a
449n/a # Skip the logger name group.
450n/a expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
451n/a
452n/a def setUp(self):
453n/a BaseTest.setUp(self)
454n/a for k, v in my_logging_levels.items():
455n/a logging.addLevelName(k, v)
456n/a
457n/a def log_at_all_levels(self, logger):
458n/a for lvl in LEVEL_RANGE:
459n/a logger.log(lvl, self.next_message())
460n/a
461n/a def test_logger_filter(self):
462n/a # Filter at logger level.
463n/a self.root_logger.setLevel(VERBOSE)
464n/a # Levels >= 'Verbose' are good.
465n/a self.log_at_all_levels(self.root_logger)
466n/a self.assert_log_lines([
467n/a ('Verbose', '5'),
468n/a ('Sociable', '6'),
469n/a ('Effusive', '7'),
470n/a ('Terse', '8'),
471n/a ('Taciturn', '9'),
472n/a ('Silent', '10'),
473n/a ])
474n/a
475n/a def test_handler_filter(self):
476n/a # Filter at handler level.
477n/a self.root_logger.handlers[0].setLevel(SOCIABLE)
478n/a try:
479n/a # Levels >= 'Sociable' are good.
480n/a self.log_at_all_levels(self.root_logger)
481n/a self.assert_log_lines([
482n/a ('Sociable', '6'),
483n/a ('Effusive', '7'),
484n/a ('Terse', '8'),
485n/a ('Taciturn', '9'),
486n/a ('Silent', '10'),
487n/a ])
488n/a finally:
489n/a self.root_logger.handlers[0].setLevel(logging.NOTSET)
490n/a
491n/a def test_specific_filters(self):
492n/a # Set a specific filter object on the handler, and then add another
493n/a # filter object on the logger itself.
494n/a handler = self.root_logger.handlers[0]
495n/a specific_filter = None
496n/a garr = GarrulousFilter()
497n/a handler.addFilter(garr)
498n/a try:
499n/a self.log_at_all_levels(self.root_logger)
500n/a first_lines = [
501n/a # Notice how 'Garrulous' is missing
502n/a ('Boring', '1'),
503n/a ('Chatterbox', '2'),
504n/a ('Talkative', '4'),
505n/a ('Verbose', '5'),
506n/a ('Sociable', '6'),
507n/a ('Effusive', '7'),
508n/a ('Terse', '8'),
509n/a ('Taciturn', '9'),
510n/a ('Silent', '10'),
511n/a ]
512n/a self.assert_log_lines(first_lines)
513n/a
514n/a specific_filter = VerySpecificFilter()
515n/a self.root_logger.addFilter(specific_filter)
516n/a self.log_at_all_levels(self.root_logger)
517n/a self.assert_log_lines(first_lines + [
518n/a # Not only 'Garrulous' is still missing, but also 'Sociable'
519n/a # and 'Taciturn'
520n/a ('Boring', '11'),
521n/a ('Chatterbox', '12'),
522n/a ('Talkative', '14'),
523n/a ('Verbose', '15'),
524n/a ('Effusive', '17'),
525n/a ('Terse', '18'),
526n/a ('Silent', '20'),
527n/a ])
528n/a finally:
529n/a if specific_filter:
530n/a self.root_logger.removeFilter(specific_filter)
531n/a handler.removeFilter(garr)
532n/a
533n/a
534n/aclass HandlerTest(BaseTest):
535n/a def test_name(self):
536n/a h = logging.Handler()
537n/a h.name = 'generic'
538n/a self.assertEqual(h.name, 'generic')
539n/a h.name = 'anothergeneric'
540n/a self.assertEqual(h.name, 'anothergeneric')
541n/a self.assertRaises(NotImplementedError, h.emit, None)
542n/a
543n/a def test_builtin_handlers(self):
544n/a # We can't actually *use* too many handlers in the tests,
545n/a # but we can try instantiating them with various options
546n/a if sys.platform in ('linux', 'darwin'):
547n/a for existing in (True, False):
548n/a fd, fn = tempfile.mkstemp()
549n/a os.close(fd)
550n/a if not existing:
551n/a os.unlink(fn)
552n/a h = logging.handlers.WatchedFileHandler(fn, delay=True)
553n/a if existing:
554n/a dev, ino = h.dev, h.ino
555n/a self.assertEqual(dev, -1)
556n/a self.assertEqual(ino, -1)
557n/a r = logging.makeLogRecord({'msg': 'Test'})
558n/a h.handle(r)
559n/a # Now remove the file.
560n/a os.unlink(fn)
561n/a self.assertFalse(os.path.exists(fn))
562n/a # The next call should recreate the file.
563n/a h.handle(r)
564n/a self.assertTrue(os.path.exists(fn))
565n/a else:
566n/a self.assertEqual(h.dev, -1)
567n/a self.assertEqual(h.ino, -1)
568n/a h.close()
569n/a if existing:
570n/a os.unlink(fn)
571n/a if sys.platform == 'darwin':
572n/a sockname = '/var/run/syslog'
573n/a else:
574n/a sockname = '/dev/log'
575n/a try:
576n/a h = logging.handlers.SysLogHandler(sockname)
577n/a self.assertEqual(h.facility, h.LOG_USER)
578n/a self.assertTrue(h.unixsocket)
579n/a h.close()
580n/a except OSError: # syslogd might not be available
581n/a pass
582n/a for method in ('GET', 'POST', 'PUT'):
583n/a if method == 'PUT':
584n/a self.assertRaises(ValueError, logging.handlers.HTTPHandler,
585n/a 'localhost', '/log', method)
586n/a else:
587n/a h = logging.handlers.HTTPHandler('localhost', '/log', method)
588n/a h.close()
589n/a h = logging.handlers.BufferingHandler(0)
590n/a r = logging.makeLogRecord({})
591n/a self.assertTrue(h.shouldFlush(r))
592n/a h.close()
593n/a h = logging.handlers.BufferingHandler(1)
594n/a self.assertFalse(h.shouldFlush(r))
595n/a h.close()
596n/a
597n/a def test_path_objects(self):
598n/a """
599n/a Test that Path objects are accepted as filename arguments to handlers.
600n/a
601n/a See Issue #27493.
602n/a """
603n/a fd, fn = tempfile.mkstemp()
604n/a os.close(fd)
605n/a os.unlink(fn)
606n/a pfn = pathlib.Path(fn)
607n/a cases = (
608n/a (logging.FileHandler, (pfn, 'w')),
609n/a (logging.handlers.RotatingFileHandler, (pfn, 'a')),
610n/a (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
611n/a )
612n/a if sys.platform in ('linux', 'darwin'):
613n/a cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
614n/a for cls, args in cases:
615n/a h = cls(*args)
616n/a self.assertTrue(os.path.exists(fn))
617n/a h.close()
618n/a os.unlink(fn)
619n/a
620n/a @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
621n/a @unittest.skipUnless(threading, 'Threading required for this test.')
622n/a def test_race(self):
623n/a # Issue #14632 refers.
624n/a def remove_loop(fname, tries):
625n/a for _ in range(tries):
626n/a try:
627n/a os.unlink(fname)
628n/a self.deletion_time = time.time()
629n/a except OSError:
630n/a pass
631n/a time.sleep(0.004 * random.randint(0, 4))
632n/a
633n/a del_count = 500
634n/a log_count = 500
635n/a
636n/a self.handle_time = None
637n/a self.deletion_time = None
638n/a
639n/a for delay in (False, True):
640n/a fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
641n/a os.close(fd)
642n/a remover = threading.Thread(target=remove_loop, args=(fn, del_count))
643n/a remover.daemon = True
644n/a remover.start()
645n/a h = logging.handlers.WatchedFileHandler(fn, delay=delay)
646n/a f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
647n/a h.setFormatter(f)
648n/a try:
649n/a for _ in range(log_count):
650n/a time.sleep(0.005)
651n/a r = logging.makeLogRecord({'msg': 'testing' })
652n/a try:
653n/a self.handle_time = time.time()
654n/a h.handle(r)
655n/a except Exception:
656n/a print('Deleted at %s, '
657n/a 'opened at %s' % (self.deletion_time,
658n/a self.handle_time))
659n/a raise
660n/a finally:
661n/a remover.join()
662n/a h.close()
663n/a if os.path.exists(fn):
664n/a os.unlink(fn)
665n/a
666n/a
667n/aclass BadStream(object):
668n/a def write(self, data):
669n/a raise RuntimeError('deliberate mistake')
670n/a
671n/aclass TestStreamHandler(logging.StreamHandler):
672n/a def handleError(self, record):
673n/a self.error_record = record
674n/a
675n/aclass StreamHandlerTest(BaseTest):
676n/a def test_error_handling(self):
677n/a h = TestStreamHandler(BadStream())
678n/a r = logging.makeLogRecord({})
679n/a old_raise = logging.raiseExceptions
680n/a
681n/a try:
682n/a h.handle(r)
683n/a self.assertIs(h.error_record, r)
684n/a
685n/a h = logging.StreamHandler(BadStream())
686n/a with support.captured_stderr() as stderr:
687n/a h.handle(r)
688n/a msg = '\nRuntimeError: deliberate mistake\n'
689n/a self.assertIn(msg, stderr.getvalue())
690n/a
691n/a logging.raiseExceptions = False
692n/a with support.captured_stderr() as stderr:
693n/a h.handle(r)
694n/a self.assertEqual('', stderr.getvalue())
695n/a finally:
696n/a logging.raiseExceptions = old_raise
697n/a
698n/a# -- The following section could be moved into a server_helper.py module
699n/a# -- if it proves to be of wider utility than just test_logging
700n/a
701n/aif threading:
702n/a class TestSMTPServer(smtpd.SMTPServer):
703n/a """
704n/a This class implements a test SMTP server.
705n/a
706n/a :param addr: A (host, port) tuple which the server listens on.
707n/a You can specify a port value of zero: the server's
708n/a *port* attribute will hold the actual port number
709n/a used, which can be used in client connections.
710n/a :param handler: A callable which will be called to process
711n/a incoming messages. The handler will be passed
712n/a the client address tuple, who the message is from,
713n/a a list of recipients and the message data.
714n/a :param poll_interval: The interval, in seconds, used in the underlying
715n/a :func:`select` or :func:`poll` call by
716n/a :func:`asyncore.loop`.
717n/a :param sockmap: A dictionary which will be used to hold
718n/a :class:`asyncore.dispatcher` instances used by
719n/a :func:`asyncore.loop`. This avoids changing the
720n/a :mod:`asyncore` module's global state.
721n/a """
722n/a
723n/a def __init__(self, addr, handler, poll_interval, sockmap):
724n/a smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
725n/a decode_data=True)
726n/a self.port = self.socket.getsockname()[1]
727n/a self._handler = handler
728n/a self._thread = None
729n/a self.poll_interval = poll_interval
730n/a
731n/a def process_message(self, peer, mailfrom, rcpttos, data):
732n/a """
733n/a Delegates to the handler passed in to the server's constructor.
734n/a
735n/a Typically, this will be a test case method.
736n/a :param peer: The client (host, port) tuple.
737n/a :param mailfrom: The address of the sender.
738n/a :param rcpttos: The addresses of the recipients.
739n/a :param data: The message.
740n/a """
741n/a self._handler(peer, mailfrom, rcpttos, data)
742n/a
743n/a def start(self):
744n/a """
745n/a Start the server running on a separate daemon thread.
746n/a """
747n/a self._thread = t = threading.Thread(target=self.serve_forever,
748n/a args=(self.poll_interval,))
749n/a t.setDaemon(True)
750n/a t.start()
751n/a
752n/a def serve_forever(self, poll_interval):
753n/a """
754n/a Run the :mod:`asyncore` loop until normal termination
755n/a conditions arise.
756n/a :param poll_interval: The interval, in seconds, used in the underlying
757n/a :func:`select` or :func:`poll` call by
758n/a :func:`asyncore.loop`.
759n/a """
760n/a try:
761n/a asyncore.loop(poll_interval, map=self._map)
762n/a except OSError:
763n/a # On FreeBSD 8, closing the server repeatably
764n/a # raises this error. We swallow it if the
765n/a # server has been closed.
766n/a if self.connected or self.accepting:
767n/a raise
768n/a
769n/a def stop(self, timeout=None):
770n/a """
771n/a Stop the thread by closing the server instance.
772n/a Wait for the server thread to terminate.
773n/a
774n/a :param timeout: How long to wait for the server thread
775n/a to terminate.
776n/a """
777n/a self.close()
778n/a self._thread.join(timeout)
779n/a self._thread = None
780n/a
781n/a class ControlMixin(object):
782n/a """
783n/a This mixin is used to start a server on a separate thread, and
784n/a shut it down programmatically. Request handling is simplified - instead
785n/a of needing to derive a suitable RequestHandler subclass, you just
786n/a provide a callable which will be passed each received request to be
787n/a processed.
788n/a
789n/a :param handler: A handler callable which will be called with a
790n/a single parameter - the request - in order to
791n/a process the request. This handler is called on the
792n/a server thread, effectively meaning that requests are
793n/a processed serially. While not quite Web scale ;-),
794n/a this should be fine for testing applications.
795n/a :param poll_interval: The polling interval in seconds.
796n/a """
797n/a def __init__(self, handler, poll_interval):
798n/a self._thread = None
799n/a self.poll_interval = poll_interval
800n/a self._handler = handler
801n/a self.ready = threading.Event()
802n/a
803n/a def start(self):
804n/a """
805n/a Create a daemon thread to run the server, and start it.
806n/a """
807n/a self._thread = t = threading.Thread(target=self.serve_forever,
808n/a args=(self.poll_interval,))
809n/a t.setDaemon(True)
810n/a t.start()
811n/a
812n/a def serve_forever(self, poll_interval):
813n/a """
814n/a Run the server. Set the ready flag before entering the
815n/a service loop.
816n/a """
817n/a self.ready.set()
818n/a super(ControlMixin, self).serve_forever(poll_interval)
819n/a
820n/a def stop(self, timeout=None):
821n/a """
822n/a Tell the server thread to stop, and wait for it to do so.
823n/a
824n/a :param timeout: How long to wait for the server thread
825n/a to terminate.
826n/a """
827n/a self.shutdown()
828n/a if self._thread is not None:
829n/a self._thread.join(timeout)
830n/a self._thread = None
831n/a self.server_close()
832n/a self.ready.clear()
833n/a
834n/a class TestHTTPServer(ControlMixin, HTTPServer):
835n/a """
836n/a An HTTP server which is controllable using :class:`ControlMixin`.
837n/a
838n/a :param addr: A tuple with the IP address and port to listen on.
839n/a :param handler: A handler callable which will be called with a
840n/a single parameter - the request - in order to
841n/a process the request.
842n/a :param poll_interval: The polling interval in seconds.
843n/a :param log: Pass ``True`` to enable log messages.
844n/a """
845n/a def __init__(self, addr, handler, poll_interval=0.5,
846n/a log=False, sslctx=None):
847n/a class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
848n/a def __getattr__(self, name, default=None):
849n/a if name.startswith('do_'):
850n/a return self.process_request
851n/a raise AttributeError(name)
852n/a
853n/a def process_request(self):
854n/a self.server._handler(self)
855n/a
856n/a def log_message(self, format, *args):
857n/a if log:
858n/a super(DelegatingHTTPRequestHandler,
859n/a self).log_message(format, *args)
860n/a HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
861n/a ControlMixin.__init__(self, handler, poll_interval)
862n/a self.sslctx = sslctx
863n/a
864n/a def get_request(self):
865n/a try:
866n/a sock, addr = self.socket.accept()
867n/a if self.sslctx:
868n/a sock = self.sslctx.wrap_socket(sock, server_side=True)
869n/a except OSError as e:
870n/a # socket errors are silenced by the caller, print them here
871n/a sys.stderr.write("Got an error:\n%s\n" % e)
872n/a raise
873n/a return sock, addr
874n/a
875n/a class TestTCPServer(ControlMixin, ThreadingTCPServer):
876n/a """
877n/a A TCP server which is controllable using :class:`ControlMixin`.
878n/a
879n/a :param addr: A tuple with the IP address and port to listen on.
880n/a :param handler: A handler callable which will be called with a single
881n/a parameter - the request - in order to process the request.
882n/a :param poll_interval: The polling interval in seconds.
883n/a :bind_and_activate: If True (the default), binds the server and starts it
884n/a listening. If False, you need to call
885n/a :meth:`server_bind` and :meth:`server_activate` at
886n/a some later time before calling :meth:`start`, so that
887n/a the server will set up the socket and listen on it.
888n/a """
889n/a
890n/a allow_reuse_address = True
891n/a
892n/a def __init__(self, addr, handler, poll_interval=0.5,
893n/a bind_and_activate=True):
894n/a class DelegatingTCPRequestHandler(StreamRequestHandler):
895n/a
896n/a def handle(self):
897n/a self.server._handler(self)
898n/a ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
899n/a bind_and_activate)
900n/a ControlMixin.__init__(self, handler, poll_interval)
901n/a
902n/a def server_bind(self):
903n/a super(TestTCPServer, self).server_bind()
904n/a self.port = self.socket.getsockname()[1]
905n/a
906n/a class TestUDPServer(ControlMixin, ThreadingUDPServer):
907n/a """
908n/a A UDP server which is controllable using :class:`ControlMixin`.
909n/a
910n/a :param addr: A tuple with the IP address and port to listen on.
911n/a :param handler: A handler callable which will be called with a
912n/a single parameter - the request - in order to
913n/a process the request.
914n/a :param poll_interval: The polling interval for shutdown requests,
915n/a in seconds.
916n/a :bind_and_activate: If True (the default), binds the server and
917n/a starts it listening. If False, you need to
918n/a call :meth:`server_bind` and
919n/a :meth:`server_activate` at some later time
920n/a before calling :meth:`start`, so that the server will
921n/a set up the socket and listen on it.
922n/a """
923n/a def __init__(self, addr, handler, poll_interval=0.5,
924n/a bind_and_activate=True):
925n/a class DelegatingUDPRequestHandler(DatagramRequestHandler):
926n/a
927n/a def handle(self):
928n/a self.server._handler(self)
929n/a
930n/a def finish(self):
931n/a data = self.wfile.getvalue()
932n/a if data:
933n/a try:
934n/a super(DelegatingUDPRequestHandler, self).finish()
935n/a except OSError:
936n/a if not self.server._closed:
937n/a raise
938n/a
939n/a ThreadingUDPServer.__init__(self, addr,
940n/a DelegatingUDPRequestHandler,
941n/a bind_and_activate)
942n/a ControlMixin.__init__(self, handler, poll_interval)
943n/a self._closed = False
944n/a
945n/a def server_bind(self):
946n/a super(TestUDPServer, self).server_bind()
947n/a self.port = self.socket.getsockname()[1]
948n/a
949n/a def server_close(self):
950n/a super(TestUDPServer, self).server_close()
951n/a self._closed = True
952n/a
953n/a if hasattr(socket, "AF_UNIX"):
954n/a class TestUnixStreamServer(TestTCPServer):
955n/a address_family = socket.AF_UNIX
956n/a
957n/a class TestUnixDatagramServer(TestUDPServer):
958n/a address_family = socket.AF_UNIX
959n/a
960n/a# - end of server_helper section
961n/a
962n/a@unittest.skipUnless(threading, 'Threading required for this test.')
963n/aclass SMTPHandlerTest(BaseTest):
964n/a TIMEOUT = 8.0
965n/a def test_basic(self):
966n/a sockmap = {}
967n/a server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
968n/a sockmap)
969n/a server.start()
970n/a addr = (support.HOST, server.port)
971n/a h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
972n/a timeout=self.TIMEOUT)
973n/a self.assertEqual(h.toaddrs, ['you'])
974n/a self.messages = []
975n/a r = logging.makeLogRecord({'msg': 'Hello \u2713'})
976n/a self.handled = threading.Event()
977n/a h.handle(r)
978n/a self.handled.wait(self.TIMEOUT) # 14314: don't wait forever
979n/a server.stop()
980n/a self.assertTrue(self.handled.is_set())
981n/a self.assertEqual(len(self.messages), 1)
982n/a peer, mailfrom, rcpttos, data = self.messages[0]
983n/a self.assertEqual(mailfrom, 'me')
984n/a self.assertEqual(rcpttos, ['you'])
985n/a self.assertIn('\nSubject: Log\n', data)
986n/a self.assertTrue(data.endswith('\n\nHello \u2713'))
987n/a h.close()
988n/a
989n/a def process_message(self, *args):
990n/a self.messages.append(args)
991n/a self.handled.set()
992n/a
993n/aclass MemoryHandlerTest(BaseTest):
994n/a
995n/a """Tests for the MemoryHandler."""
996n/a
997n/a # Do not bother with a logger name group.
998n/a expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
999n/a
1000n/a def setUp(self):
1001n/a BaseTest.setUp(self)
1002n/a self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1003n/a self.root_hdlr)
1004n/a self.mem_logger = logging.getLogger('mem')
1005n/a self.mem_logger.propagate = 0
1006n/a self.mem_logger.addHandler(self.mem_hdlr)
1007n/a
1008n/a def tearDown(self):
1009n/a self.mem_hdlr.close()
1010n/a BaseTest.tearDown(self)
1011n/a
1012n/a def test_flush(self):
1013n/a # The memory handler flushes to its target handler based on specific
1014n/a # criteria (message count and message level).
1015n/a self.mem_logger.debug(self.next_message())
1016n/a self.assert_log_lines([])
1017n/a self.mem_logger.info(self.next_message())
1018n/a self.assert_log_lines([])
1019n/a # This will flush because the level is >= logging.WARNING
1020n/a self.mem_logger.warning(self.next_message())
1021n/a lines = [
1022n/a ('DEBUG', '1'),
1023n/a ('INFO', '2'),
1024n/a ('WARNING', '3'),
1025n/a ]
1026n/a self.assert_log_lines(lines)
1027n/a for n in (4, 14):
1028n/a for i in range(9):
1029n/a self.mem_logger.debug(self.next_message())
1030n/a self.assert_log_lines(lines)
1031n/a # This will flush because it's the 10th message since the last
1032n/a # flush.
1033n/a self.mem_logger.debug(self.next_message())
1034n/a lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1035n/a self.assert_log_lines(lines)
1036n/a
1037n/a self.mem_logger.debug(self.next_message())
1038n/a self.assert_log_lines(lines)
1039n/a
1040n/a def test_flush_on_close(self):
1041n/a """
1042n/a Test that the flush-on-close configuration works as expected.
1043n/a """
1044n/a self.mem_logger.debug(self.next_message())
1045n/a self.assert_log_lines([])
1046n/a self.mem_logger.info(self.next_message())
1047n/a self.assert_log_lines([])
1048n/a self.mem_logger.removeHandler(self.mem_hdlr)
1049n/a # Default behaviour is to flush on close. Check that it happens.
1050n/a self.mem_hdlr.close()
1051n/a lines = [
1052n/a ('DEBUG', '1'),
1053n/a ('INFO', '2'),
1054n/a ]
1055n/a self.assert_log_lines(lines)
1056n/a # Now configure for flushing not to be done on close.
1057n/a self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1058n/a self.root_hdlr,
1059n/a False)
1060n/a self.mem_logger.addHandler(self.mem_hdlr)
1061n/a self.mem_logger.debug(self.next_message())
1062n/a self.assert_log_lines(lines) # no change
1063n/a self.mem_logger.info(self.next_message())
1064n/a self.assert_log_lines(lines) # no change
1065n/a self.mem_logger.removeHandler(self.mem_hdlr)
1066n/a self.mem_hdlr.close()
1067n/a # assert that no new lines have been added
1068n/a self.assert_log_lines(lines) # no change
1069n/a
1070n/a
1071n/aclass ExceptionFormatter(logging.Formatter):
1072n/a """A special exception formatter."""
1073n/a def formatException(self, ei):
1074n/a return "Got a [%s]" % ei[0].__name__
1075n/a
1076n/a
1077n/aclass ConfigFileTest(BaseTest):
1078n/a
1079n/a """Reading logging config from a .ini-style config file."""
1080n/a
1081n/a expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1082n/a
1083n/a # config0 is a standard configuration.
1084n/a config0 = """
1085n/a [loggers]
1086n/a keys=root
1087n/a
1088n/a [handlers]
1089n/a keys=hand1
1090n/a
1091n/a [formatters]
1092n/a keys=form1
1093n/a
1094n/a [logger_root]
1095n/a level=WARNING
1096n/a handlers=hand1
1097n/a
1098n/a [handler_hand1]
1099n/a class=StreamHandler
1100n/a level=NOTSET
1101n/a formatter=form1
1102n/a args=(sys.stdout,)
1103n/a
1104n/a [formatter_form1]
1105n/a format=%(levelname)s ++ %(message)s
1106n/a datefmt=
1107n/a """
1108n/a
1109n/a # config1 adds a little to the standard configuration.
1110n/a config1 = """
1111n/a [loggers]
1112n/a keys=root,parser
1113n/a
1114n/a [handlers]
1115n/a keys=hand1
1116n/a
1117n/a [formatters]
1118n/a keys=form1
1119n/a
1120n/a [logger_root]
1121n/a level=WARNING
1122n/a handlers=
1123n/a
1124n/a [logger_parser]
1125n/a level=DEBUG
1126n/a handlers=hand1
1127n/a propagate=1
1128n/a qualname=compiler.parser
1129n/a
1130n/a [handler_hand1]
1131n/a class=StreamHandler
1132n/a level=NOTSET
1133n/a formatter=form1
1134n/a args=(sys.stdout,)
1135n/a
1136n/a [formatter_form1]
1137n/a format=%(levelname)s ++ %(message)s
1138n/a datefmt=
1139n/a """
1140n/a
1141n/a # config1a moves the handler to the root.
1142n/a config1a = """
1143n/a [loggers]
1144n/a keys=root,parser
1145n/a
1146n/a [handlers]
1147n/a keys=hand1
1148n/a
1149n/a [formatters]
1150n/a keys=form1
1151n/a
1152n/a [logger_root]
1153n/a level=WARNING
1154n/a handlers=hand1
1155n/a
1156n/a [logger_parser]
1157n/a level=DEBUG
1158n/a handlers=
1159n/a propagate=1
1160n/a qualname=compiler.parser
1161n/a
1162n/a [handler_hand1]
1163n/a class=StreamHandler
1164n/a level=NOTSET
1165n/a formatter=form1
1166n/a args=(sys.stdout,)
1167n/a
1168n/a [formatter_form1]
1169n/a format=%(levelname)s ++ %(message)s
1170n/a datefmt=
1171n/a """
1172n/a
1173n/a # config2 has a subtle configuration error that should be reported
1174n/a config2 = config1.replace("sys.stdout", "sys.stbout")
1175n/a
1176n/a # config3 has a less subtle configuration error
1177n/a config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1178n/a
1179n/a # config4 specifies a custom formatter class to be loaded
1180n/a config4 = """
1181n/a [loggers]
1182n/a keys=root
1183n/a
1184n/a [handlers]
1185n/a keys=hand1
1186n/a
1187n/a [formatters]
1188n/a keys=form1
1189n/a
1190n/a [logger_root]
1191n/a level=NOTSET
1192n/a handlers=hand1
1193n/a
1194n/a [handler_hand1]
1195n/a class=StreamHandler
1196n/a level=NOTSET
1197n/a formatter=form1
1198n/a args=(sys.stdout,)
1199n/a
1200n/a [formatter_form1]
1201n/a class=""" + __name__ + """.ExceptionFormatter
1202n/a format=%(levelname)s:%(name)s:%(message)s
1203n/a datefmt=
1204n/a """
1205n/a
1206n/a # config5 specifies a custom handler class to be loaded
1207n/a config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1208n/a
1209n/a # config6 uses ', ' delimiters in the handlers and formatters sections
1210n/a config6 = """
1211n/a [loggers]
1212n/a keys=root,parser
1213n/a
1214n/a [handlers]
1215n/a keys=hand1, hand2
1216n/a
1217n/a [formatters]
1218n/a keys=form1, form2
1219n/a
1220n/a [logger_root]
1221n/a level=WARNING
1222n/a handlers=
1223n/a
1224n/a [logger_parser]
1225n/a level=DEBUG
1226n/a handlers=hand1
1227n/a propagate=1
1228n/a qualname=compiler.parser
1229n/a
1230n/a [handler_hand1]
1231n/a class=StreamHandler
1232n/a level=NOTSET
1233n/a formatter=form1
1234n/a args=(sys.stdout,)
1235n/a
1236n/a [handler_hand2]
1237n/a class=StreamHandler
1238n/a level=NOTSET
1239n/a formatter=form1
1240n/a args=(sys.stderr,)
1241n/a
1242n/a [formatter_form1]
1243n/a format=%(levelname)s ++ %(message)s
1244n/a datefmt=
1245n/a
1246n/a [formatter_form2]
1247n/a format=%(message)s
1248n/a datefmt=
1249n/a """
1250n/a
1251n/a # config7 adds a compiler logger.
1252n/a config7 = """
1253n/a [loggers]
1254n/a keys=root,parser,compiler
1255n/a
1256n/a [handlers]
1257n/a keys=hand1
1258n/a
1259n/a [formatters]
1260n/a keys=form1
1261n/a
1262n/a [logger_root]
1263n/a level=WARNING
1264n/a handlers=hand1
1265n/a
1266n/a [logger_compiler]
1267n/a level=DEBUG
1268n/a handlers=
1269n/a propagate=1
1270n/a qualname=compiler
1271n/a
1272n/a [logger_parser]
1273n/a level=DEBUG
1274n/a handlers=
1275n/a propagate=1
1276n/a qualname=compiler.parser
1277n/a
1278n/a [handler_hand1]
1279n/a class=StreamHandler
1280n/a level=NOTSET
1281n/a formatter=form1
1282n/a args=(sys.stdout,)
1283n/a
1284n/a [formatter_form1]
1285n/a format=%(levelname)s ++ %(message)s
1286n/a datefmt=
1287n/a """
1288n/a
1289n/a disable_test = """
1290n/a [loggers]
1291n/a keys=root
1292n/a
1293n/a [handlers]
1294n/a keys=screen
1295n/a
1296n/a [formatters]
1297n/a keys=
1298n/a
1299n/a [logger_root]
1300n/a level=DEBUG
1301n/a handlers=screen
1302n/a
1303n/a [handler_screen]
1304n/a level=DEBUG
1305n/a class=StreamHandler
1306n/a args=(sys.stdout,)
1307n/a formatter=
1308n/a """
1309n/a
1310n/a def apply_config(self, conf, **kwargs):
1311n/a file = io.StringIO(textwrap.dedent(conf))
1312n/a logging.config.fileConfig(file, **kwargs)
1313n/a
1314n/a def test_config0_ok(self):
1315n/a # A simple config file which overrides the default settings.
1316n/a with support.captured_stdout() as output:
1317n/a self.apply_config(self.config0)
1318n/a logger = logging.getLogger()
1319n/a # Won't output anything
1320n/a logger.info(self.next_message())
1321n/a # Outputs a message
1322n/a logger.error(self.next_message())
1323n/a self.assert_log_lines([
1324n/a ('ERROR', '2'),
1325n/a ], stream=output)
1326n/a # Original logger output is empty.
1327n/a self.assert_log_lines([])
1328n/a
1329n/a def test_config0_using_cp_ok(self):
1330n/a # A simple config file which overrides the default settings.
1331n/a with support.captured_stdout() as output:
1332n/a file = io.StringIO(textwrap.dedent(self.config0))
1333n/a cp = configparser.ConfigParser()
1334n/a cp.read_file(file)
1335n/a logging.config.fileConfig(cp)
1336n/a logger = logging.getLogger()
1337n/a # Won't output anything
1338n/a logger.info(self.next_message())
1339n/a # Outputs a message
1340n/a logger.error(self.next_message())
1341n/a self.assert_log_lines([
1342n/a ('ERROR', '2'),
1343n/a ], stream=output)
1344n/a # Original logger output is empty.
1345n/a self.assert_log_lines([])
1346n/a
1347n/a def test_config1_ok(self, config=config1):
1348n/a # A config file defining a sub-parser as well.
1349n/a with support.captured_stdout() as output:
1350n/a self.apply_config(config)
1351n/a logger = logging.getLogger("compiler.parser")
1352n/a # Both will output a message
1353n/a logger.info(self.next_message())
1354n/a logger.error(self.next_message())
1355n/a self.assert_log_lines([
1356n/a ('INFO', '1'),
1357n/a ('ERROR', '2'),
1358n/a ], stream=output)
1359n/a # Original logger output is empty.
1360n/a self.assert_log_lines([])
1361n/a
1362n/a def test_config2_failure(self):
1363n/a # A simple config file which overrides the default settings.
1364n/a self.assertRaises(Exception, self.apply_config, self.config2)
1365n/a
1366n/a def test_config3_failure(self):
1367n/a # A simple config file which overrides the default settings.
1368n/a self.assertRaises(Exception, self.apply_config, self.config3)
1369n/a
1370n/a def test_config4_ok(self):
1371n/a # A config file specifying a custom formatter class.
1372n/a with support.captured_stdout() as output:
1373n/a self.apply_config(self.config4)
1374n/a logger = logging.getLogger()
1375n/a try:
1376n/a raise RuntimeError()
1377n/a except RuntimeError:
1378n/a logging.exception("just testing")
1379n/a sys.stdout.seek(0)
1380n/a self.assertEqual(output.getvalue(),
1381n/a "ERROR:root:just testing\nGot a [RuntimeError]\n")
1382n/a # Original logger output is empty
1383n/a self.assert_log_lines([])
1384n/a
1385n/a def test_config5_ok(self):
1386n/a self.test_config1_ok(config=self.config5)
1387n/a
1388n/a def test_config6_ok(self):
1389n/a self.test_config1_ok(config=self.config6)
1390n/a
1391n/a def test_config7_ok(self):
1392n/a with support.captured_stdout() as output:
1393n/a self.apply_config(self.config1a)
1394n/a logger = logging.getLogger("compiler.parser")
1395n/a # See issue #11424. compiler-hyphenated sorts
1396n/a # between compiler and compiler.xyz and this
1397n/a # was preventing compiler.xyz from being included
1398n/a # in the child loggers of compiler because of an
1399n/a # overzealous loop termination condition.
1400n/a hyphenated = logging.getLogger('compiler-hyphenated')
1401n/a # All will output a message
1402n/a logger.info(self.next_message())
1403n/a logger.error(self.next_message())
1404n/a hyphenated.critical(self.next_message())
1405n/a self.assert_log_lines([
1406n/a ('INFO', '1'),
1407n/a ('ERROR', '2'),
1408n/a ('CRITICAL', '3'),
1409n/a ], stream=output)
1410n/a # Original logger output is empty.
1411n/a self.assert_log_lines([])
1412n/a with support.captured_stdout() as output:
1413n/a self.apply_config(self.config7)
1414n/a logger = logging.getLogger("compiler.parser")
1415n/a self.assertFalse(logger.disabled)
1416n/a # Both will output a message
1417n/a logger.info(self.next_message())
1418n/a logger.error(self.next_message())
1419n/a logger = logging.getLogger("compiler.lexer")
1420n/a # Both will output a message
1421n/a logger.info(self.next_message())
1422n/a logger.error(self.next_message())
1423n/a # Will not appear
1424n/a hyphenated.critical(self.next_message())
1425n/a self.assert_log_lines([
1426n/a ('INFO', '4'),
1427n/a ('ERROR', '5'),
1428n/a ('INFO', '6'),
1429n/a ('ERROR', '7'),
1430n/a ], stream=output)
1431n/a # Original logger output is empty.
1432n/a self.assert_log_lines([])
1433n/a
1434n/a def test_logger_disabling(self):
1435n/a self.apply_config(self.disable_test)
1436n/a logger = logging.getLogger('some_pristine_logger')
1437n/a self.assertFalse(logger.disabled)
1438n/a self.apply_config(self.disable_test)
1439n/a self.assertTrue(logger.disabled)
1440n/a self.apply_config(self.disable_test, disable_existing_loggers=False)
1441n/a self.assertFalse(logger.disabled)
1442n/a
1443n/a
1444n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1445n/aclass SocketHandlerTest(BaseTest):
1446n/a
1447n/a """Test for SocketHandler objects."""
1448n/a
1449n/a if threading:
1450n/a server_class = TestTCPServer
1451n/a address = ('localhost', 0)
1452n/a
1453n/a def setUp(self):
1454n/a """Set up a TCP server to receive log messages, and a SocketHandler
1455n/a pointing to that server's address and port."""
1456n/a BaseTest.setUp(self)
1457n/a # Issue #29177: deal with errors that happen during setup
1458n/a self.server = self.sock_hdlr = self.server_exception = None
1459n/a try:
1460n/a self.server = server = self.server_class(self.address,
1461n/a self.handle_socket, 0.01)
1462n/a server.start()
1463n/a # Uncomment next line to test error recovery in setUp()
1464n/a # raise OSError('dummy error raised')
1465n/a except OSError as e:
1466n/a self.server_exception = e
1467n/a return
1468n/a server.ready.wait()
1469n/a hcls = logging.handlers.SocketHandler
1470n/a if isinstance(server.server_address, tuple):
1471n/a self.sock_hdlr = hcls('localhost', server.port)
1472n/a else:
1473n/a self.sock_hdlr = hcls(server.server_address, None)
1474n/a self.log_output = ''
1475n/a self.root_logger.removeHandler(self.root_logger.handlers[0])
1476n/a self.root_logger.addHandler(self.sock_hdlr)
1477n/a self.handled = threading.Semaphore(0)
1478n/a
1479n/a def tearDown(self):
1480n/a """Shutdown the TCP server."""
1481n/a try:
1482n/a if self.server:
1483n/a self.server.stop(2.0)
1484n/a if self.sock_hdlr:
1485n/a self.root_logger.removeHandler(self.sock_hdlr)
1486n/a self.sock_hdlr.close()
1487n/a finally:
1488n/a BaseTest.tearDown(self)
1489n/a
1490n/a def handle_socket(self, request):
1491n/a conn = request.connection
1492n/a while True:
1493n/a chunk = conn.recv(4)
1494n/a if len(chunk) < 4:
1495n/a break
1496n/a slen = struct.unpack(">L", chunk)[0]
1497n/a chunk = conn.recv(slen)
1498n/a while len(chunk) < slen:
1499n/a chunk = chunk + conn.recv(slen - len(chunk))
1500n/a obj = pickle.loads(chunk)
1501n/a record = logging.makeLogRecord(obj)
1502n/a self.log_output += record.msg + '\n'
1503n/a self.handled.release()
1504n/a
1505n/a def test_output(self):
1506n/a # The log message sent to the SocketHandler is properly received.
1507n/a if self.server_exception:
1508n/a self.skipTest(self.server_exception)
1509n/a logger = logging.getLogger("tcp")
1510n/a logger.error("spam")
1511n/a self.handled.acquire()
1512n/a logger.debug("eggs")
1513n/a self.handled.acquire()
1514n/a self.assertEqual(self.log_output, "spam\neggs\n")
1515n/a
1516n/a def test_noserver(self):
1517n/a if self.server_exception:
1518n/a self.skipTest(self.server_exception)
1519n/a # Avoid timing-related failures due to SocketHandler's own hard-wired
1520n/a # one-second timeout on socket.create_connection() (issue #16264).
1521n/a self.sock_hdlr.retryStart = 2.5
1522n/a # Kill the server
1523n/a self.server.stop(2.0)
1524n/a # The logging call should try to connect, which should fail
1525n/a try:
1526n/a raise RuntimeError('Deliberate mistake')
1527n/a except RuntimeError:
1528n/a self.root_logger.exception('Never sent')
1529n/a self.root_logger.error('Never sent, either')
1530n/a now = time.time()
1531n/a self.assertGreater(self.sock_hdlr.retryTime, now)
1532n/a time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1533n/a self.root_logger.error('Nor this')
1534n/a
1535n/adef _get_temp_domain_socket():
1536n/a fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1537n/a os.close(fd)
1538n/a # just need a name - file can't be present, or we'll get an
1539n/a # 'address already in use' error.
1540n/a os.remove(fn)
1541n/a return fn
1542n/a
1543n/a@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1544n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1545n/aclass UnixSocketHandlerTest(SocketHandlerTest):
1546n/a
1547n/a """Test for SocketHandler with unix sockets."""
1548n/a
1549n/a if threading and hasattr(socket, "AF_UNIX"):
1550n/a server_class = TestUnixStreamServer
1551n/a
1552n/a def setUp(self):
1553n/a # override the definition in the base class
1554n/a self.address = _get_temp_domain_socket()
1555n/a SocketHandlerTest.setUp(self)
1556n/a
1557n/a def tearDown(self):
1558n/a SocketHandlerTest.tearDown(self)
1559n/a support.unlink(self.address)
1560n/a
1561n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1562n/aclass DatagramHandlerTest(BaseTest):
1563n/a
1564n/a """Test for DatagramHandler."""
1565n/a
1566n/a if threading:
1567n/a server_class = TestUDPServer
1568n/a address = ('localhost', 0)
1569n/a
1570n/a def setUp(self):
1571n/a """Set up a UDP server to receive log messages, and a DatagramHandler
1572n/a pointing to that server's address and port."""
1573n/a BaseTest.setUp(self)
1574n/a # Issue #29177: deal with errors that happen during setup
1575n/a self.server = self.sock_hdlr = self.server_exception = None
1576n/a try:
1577n/a self.server = server = self.server_class(self.address,
1578n/a self.handle_datagram, 0.01)
1579n/a server.start()
1580n/a # Uncomment next line to test error recovery in setUp()
1581n/a # raise OSError('dummy error raised')
1582n/a except OSError as e:
1583n/a self.server_exception = e
1584n/a return
1585n/a server.ready.wait()
1586n/a hcls = logging.handlers.DatagramHandler
1587n/a if isinstance(server.server_address, tuple):
1588n/a self.sock_hdlr = hcls('localhost', server.port)
1589n/a else:
1590n/a self.sock_hdlr = hcls(server.server_address, None)
1591n/a self.log_output = ''
1592n/a self.root_logger.removeHandler(self.root_logger.handlers[0])
1593n/a self.root_logger.addHandler(self.sock_hdlr)
1594n/a self.handled = threading.Event()
1595n/a
1596n/a def tearDown(self):
1597n/a """Shutdown the UDP server."""
1598n/a try:
1599n/a if self.server:
1600n/a self.server.stop(2.0)
1601n/a if self.sock_hdlr:
1602n/a self.root_logger.removeHandler(self.sock_hdlr)
1603n/a self.sock_hdlr.close()
1604n/a finally:
1605n/a BaseTest.tearDown(self)
1606n/a
1607n/a def handle_datagram(self, request):
1608n/a slen = struct.pack('>L', 0) # length of prefix
1609n/a packet = request.packet[len(slen):]
1610n/a obj = pickle.loads(packet)
1611n/a record = logging.makeLogRecord(obj)
1612n/a self.log_output += record.msg + '\n'
1613n/a self.handled.set()
1614n/a
1615n/a def test_output(self):
1616n/a # The log message sent to the DatagramHandler is properly received.
1617n/a if self.server_exception:
1618n/a self.skipTest(self.server_exception)
1619n/a logger = logging.getLogger("udp")
1620n/a logger.error("spam")
1621n/a self.handled.wait()
1622n/a self.handled.clear()
1623n/a logger.error("eggs")
1624n/a self.handled.wait()
1625n/a self.assertEqual(self.log_output, "spam\neggs\n")
1626n/a
1627n/a@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1628n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1629n/aclass UnixDatagramHandlerTest(DatagramHandlerTest):
1630n/a
1631n/a """Test for DatagramHandler using Unix sockets."""
1632n/a
1633n/a if threading and hasattr(socket, "AF_UNIX"):
1634n/a server_class = TestUnixDatagramServer
1635n/a
1636n/a def setUp(self):
1637n/a # override the definition in the base class
1638n/a self.address = _get_temp_domain_socket()
1639n/a DatagramHandlerTest.setUp(self)
1640n/a
1641n/a def tearDown(self):
1642n/a DatagramHandlerTest.tearDown(self)
1643n/a support.unlink(self.address)
1644n/a
1645n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1646n/aclass SysLogHandlerTest(BaseTest):
1647n/a
1648n/a """Test for SysLogHandler using UDP."""
1649n/a
1650n/a if threading:
1651n/a server_class = TestUDPServer
1652n/a address = ('localhost', 0)
1653n/a
1654n/a def setUp(self):
1655n/a """Set up a UDP server to receive log messages, and a SysLogHandler
1656n/a pointing to that server's address and port."""
1657n/a BaseTest.setUp(self)
1658n/a # Issue #29177: deal with errors that happen during setup
1659n/a self.server = self.sl_hdlr = self.server_exception = None
1660n/a try:
1661n/a self.server = server = self.server_class(self.address,
1662n/a self.handle_datagram, 0.01)
1663n/a server.start()
1664n/a # Uncomment next line to test error recovery in setUp()
1665n/a # raise OSError('dummy error raised')
1666n/a except OSError as e:
1667n/a self.server_exception = e
1668n/a return
1669n/a server.ready.wait()
1670n/a hcls = logging.handlers.SysLogHandler
1671n/a if isinstance(server.server_address, tuple):
1672n/a self.sl_hdlr = hcls(('localhost', server.port))
1673n/a else:
1674n/a self.sl_hdlr = hcls(server.server_address)
1675n/a self.log_output = ''
1676n/a self.root_logger.removeHandler(self.root_logger.handlers[0])
1677n/a self.root_logger.addHandler(self.sl_hdlr)
1678n/a self.handled = threading.Event()
1679n/a
1680n/a def tearDown(self):
1681n/a """Shutdown the server."""
1682n/a try:
1683n/a if self.server:
1684n/a self.server.stop(2.0)
1685n/a if self.sl_hdlr:
1686n/a self.root_logger.removeHandler(self.sl_hdlr)
1687n/a self.sl_hdlr.close()
1688n/a finally:
1689n/a BaseTest.tearDown(self)
1690n/a
1691n/a def handle_datagram(self, request):
1692n/a self.log_output = request.packet
1693n/a self.handled.set()
1694n/a
1695n/a def test_output(self):
1696n/a if self.server_exception:
1697n/a self.skipTest(self.server_exception)
1698n/a # The log message sent to the SysLogHandler is properly received.
1699n/a logger = logging.getLogger("slh")
1700n/a logger.error("sp\xe4m")
1701n/a self.handled.wait()
1702n/a self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1703n/a self.handled.clear()
1704n/a self.sl_hdlr.append_nul = False
1705n/a logger.error("sp\xe4m")
1706n/a self.handled.wait()
1707n/a self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1708n/a self.handled.clear()
1709n/a self.sl_hdlr.ident = "h\xe4m-"
1710n/a logger.error("sp\xe4m")
1711n/a self.handled.wait()
1712n/a self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1713n/a
1714n/a@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1715n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1716n/aclass UnixSysLogHandlerTest(SysLogHandlerTest):
1717n/a
1718n/a """Test for SysLogHandler with Unix sockets."""
1719n/a
1720n/a if threading and hasattr(socket, "AF_UNIX"):
1721n/a server_class = TestUnixDatagramServer
1722n/a
1723n/a def setUp(self):
1724n/a # override the definition in the base class
1725n/a self.address = _get_temp_domain_socket()
1726n/a SysLogHandlerTest.setUp(self)
1727n/a
1728n/a def tearDown(self):
1729n/a SysLogHandlerTest.tearDown(self)
1730n/a support.unlink(self.address)
1731n/a
1732n/a@unittest.skipUnless(threading, 'Threading required for this test.')
1733n/aclass HTTPHandlerTest(BaseTest):
1734n/a """Test for HTTPHandler."""
1735n/a
1736n/a def setUp(self):
1737n/a """Set up an HTTP server to receive log messages, and a HTTPHandler
1738n/a pointing to that server's address and port."""
1739n/a BaseTest.setUp(self)
1740n/a self.handled = threading.Event()
1741n/a
1742n/a def handle_request(self, request):
1743n/a self.command = request.command
1744n/a self.log_data = urlparse(request.path)
1745n/a if self.command == 'POST':
1746n/a try:
1747n/a rlen = int(request.headers['Content-Length'])
1748n/a self.post_data = request.rfile.read(rlen)
1749n/a except:
1750n/a self.post_data = None
1751n/a request.send_response(200)
1752n/a request.end_headers()
1753n/a self.handled.set()
1754n/a
1755n/a def test_output(self):
1756n/a # The log message sent to the HTTPHandler is properly received.
1757n/a logger = logging.getLogger("http")
1758n/a root_logger = self.root_logger
1759n/a root_logger.removeHandler(self.root_logger.handlers[0])
1760n/a for secure in (False, True):
1761n/a addr = ('localhost', 0)
1762n/a if secure:
1763n/a try:
1764n/a import ssl
1765n/a except ImportError:
1766n/a sslctx = None
1767n/a else:
1768n/a here = os.path.dirname(__file__)
1769n/a localhost_cert = os.path.join(here, "keycert.pem")
1770n/a sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1771n/a sslctx.load_cert_chain(localhost_cert)
1772n/a
1773n/a context = ssl.create_default_context(cafile=localhost_cert)
1774n/a else:
1775n/a sslctx = None
1776n/a context = None
1777n/a self.server = server = TestHTTPServer(addr, self.handle_request,
1778n/a 0.01, sslctx=sslctx)
1779n/a server.start()
1780n/a server.ready.wait()
1781n/a host = 'localhost:%d' % server.server_port
1782n/a secure_client = secure and sslctx
1783n/a self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
1784n/a secure=secure_client,
1785n/a context=context,
1786n/a credentials=('foo', 'bar'))
1787n/a self.log_data = None
1788n/a root_logger.addHandler(self.h_hdlr)
1789n/a
1790n/a for method in ('GET', 'POST'):
1791n/a self.h_hdlr.method = method
1792n/a self.handled.clear()
1793n/a msg = "sp\xe4m"
1794n/a logger.error(msg)
1795n/a self.handled.wait()
1796n/a self.assertEqual(self.log_data.path, '/frob')
1797n/a self.assertEqual(self.command, method)
1798n/a if method == 'GET':
1799n/a d = parse_qs(self.log_data.query)
1800n/a else:
1801n/a d = parse_qs(self.post_data.decode('utf-8'))
1802n/a self.assertEqual(d['name'], ['http'])
1803n/a self.assertEqual(d['funcName'], ['test_output'])
1804n/a self.assertEqual(d['msg'], [msg])
1805n/a
1806n/a self.server.stop(2.0)
1807n/a self.root_logger.removeHandler(self.h_hdlr)
1808n/a self.h_hdlr.close()
1809n/a
1810n/aclass MemoryTest(BaseTest):
1811n/a
1812n/a """Test memory persistence of logger objects."""
1813n/a
1814n/a def setUp(self):
1815n/a """Create a dict to remember potentially destroyed objects."""
1816n/a BaseTest.setUp(self)
1817n/a self._survivors = {}
1818n/a
1819n/a def _watch_for_survival(self, *args):
1820n/a """Watch the given objects for survival, by creating weakrefs to
1821n/a them."""
1822n/a for obj in args:
1823n/a key = id(obj), repr(obj)
1824n/a self._survivors[key] = weakref.ref(obj)
1825n/a
1826n/a def _assertTruesurvival(self):
1827n/a """Assert that all objects watched for survival have survived."""
1828n/a # Trigger cycle breaking.
1829n/a gc.collect()
1830n/a dead = []
1831n/a for (id_, repr_), ref in self._survivors.items():
1832n/a if ref() is None:
1833n/a dead.append(repr_)
1834n/a if dead:
1835n/a self.fail("%d objects should have survived "
1836n/a "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
1837n/a
1838n/a def test_persistent_loggers(self):
1839n/a # Logger objects are persistent and retain their configuration, even
1840n/a # if visible references are destroyed.
1841n/a self.root_logger.setLevel(logging.INFO)
1842n/a foo = logging.getLogger("foo")
1843n/a self._watch_for_survival(foo)
1844n/a foo.setLevel(logging.DEBUG)
1845n/a self.root_logger.debug(self.next_message())
1846n/a foo.debug(self.next_message())
1847n/a self.assert_log_lines([
1848n/a ('foo', 'DEBUG', '2'),
1849n/a ])
1850n/a del foo
1851n/a # foo has survived.
1852n/a self._assertTruesurvival()
1853n/a # foo has retained its settings.
1854n/a bar = logging.getLogger("foo")
1855n/a bar.debug(self.next_message())
1856n/a self.assert_log_lines([
1857n/a ('foo', 'DEBUG', '2'),
1858n/a ('foo', 'DEBUG', '3'),
1859n/a ])
1860n/a
1861n/a
1862n/aclass EncodingTest(BaseTest):
1863n/a def test_encoding_plain_file(self):
1864n/a # In Python 2.x, a plain file object is treated as having no encoding.
1865n/a log = logging.getLogger("test")
1866n/a fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
1867n/a os.close(fd)
1868n/a # the non-ascii data we write to the log.
1869n/a data = "foo\x80"
1870n/a try:
1871n/a handler = logging.FileHandler(fn, encoding="utf-8")
1872n/a log.addHandler(handler)
1873n/a try:
1874n/a # write non-ascii data to the log.
1875n/a log.warning(data)
1876n/a finally:
1877n/a log.removeHandler(handler)
1878n/a handler.close()
1879n/a # check we wrote exactly those bytes, ignoring trailing \n etc
1880n/a f = open(fn, encoding="utf-8")
1881n/a try:
1882n/a self.assertEqual(f.read().rstrip(), data)
1883n/a finally:
1884n/a f.close()
1885n/a finally:
1886n/a if os.path.isfile(fn):
1887n/a os.remove(fn)
1888n/a
1889n/a def test_encoding_cyrillic_unicode(self):
1890n/a log = logging.getLogger("test")
1891n/a #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
1892n/a message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
1893n/a #Ensure it's written in a Cyrillic encoding
1894n/a writer_class = codecs.getwriter('cp1251')
1895n/a writer_class.encoding = 'cp1251'
1896n/a stream = io.BytesIO()
1897n/a writer = writer_class(stream, 'strict')
1898n/a handler = logging.StreamHandler(writer)
1899n/a log.addHandler(handler)
1900n/a try:
1901n/a log.warning(message)
1902n/a finally:
1903n/a log.removeHandler(handler)
1904n/a handler.close()
1905n/a # check we wrote exactly those bytes, ignoring trailing \n etc
1906n/a s = stream.getvalue()
1907n/a #Compare against what the data should be when encoded in CP-1251
1908n/a self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
1909n/a
1910n/a
1911n/aclass WarningsTest(BaseTest):
1912n/a
1913n/a def test_warnings(self):
1914n/a with warnings.catch_warnings():
1915n/a logging.captureWarnings(True)
1916n/a self.addCleanup(logging.captureWarnings, False)
1917n/a warnings.filterwarnings("always", category=UserWarning)
1918n/a stream = io.StringIO()
1919n/a h = logging.StreamHandler(stream)
1920n/a logger = logging.getLogger("py.warnings")
1921n/a logger.addHandler(h)
1922n/a warnings.warn("I'm warning you...")
1923n/a logger.removeHandler(h)
1924n/a s = stream.getvalue()
1925n/a h.close()
1926n/a self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
1927n/a
1928n/a #See if an explicit file uses the original implementation
1929n/a a_file = io.StringIO()
1930n/a warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
1931n/a a_file, "Dummy line")
1932n/a s = a_file.getvalue()
1933n/a a_file.close()
1934n/a self.assertEqual(s,
1935n/a "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
1936n/a
1937n/a def test_warnings_no_handlers(self):
1938n/a with warnings.catch_warnings():
1939n/a logging.captureWarnings(True)
1940n/a self.addCleanup(logging.captureWarnings, False)
1941n/a
1942n/a # confirm our assumption: no loggers are set
1943n/a logger = logging.getLogger("py.warnings")
1944n/a self.assertEqual(logger.handlers, [])
1945n/a
1946n/a warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
1947n/a self.assertEqual(len(logger.handlers), 1)
1948n/a self.assertIsInstance(logger.handlers[0], logging.NullHandler)
1949n/a
1950n/a
1951n/adef formatFunc(format, datefmt=None):
1952n/a return logging.Formatter(format, datefmt)
1953n/a
1954n/adef handlerFunc():
1955n/a return logging.StreamHandler()
1956n/a
1957n/aclass CustomHandler(logging.StreamHandler):
1958n/a pass
1959n/a
1960n/aclass ConfigDictTest(BaseTest):
1961n/a
1962n/a """Reading logging config from a dictionary."""
1963n/a
1964n/a expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1965n/a
1966n/a # config0 is a standard configuration.
1967n/a config0 = {
1968n/a 'version': 1,
1969n/a 'formatters': {
1970n/a 'form1' : {
1971n/a 'format' : '%(levelname)s ++ %(message)s',
1972n/a },
1973n/a },
1974n/a 'handlers' : {
1975n/a 'hand1' : {
1976n/a 'class' : 'logging.StreamHandler',
1977n/a 'formatter' : 'form1',
1978n/a 'level' : 'NOTSET',
1979n/a 'stream' : 'ext://sys.stdout',
1980n/a },
1981n/a },
1982n/a 'root' : {
1983n/a 'level' : 'WARNING',
1984n/a 'handlers' : ['hand1'],
1985n/a },
1986n/a }
1987n/a
1988n/a # config1 adds a little to the standard configuration.
1989n/a config1 = {
1990n/a 'version': 1,
1991n/a 'formatters': {
1992n/a 'form1' : {
1993n/a 'format' : '%(levelname)s ++ %(message)s',
1994n/a },
1995n/a },
1996n/a 'handlers' : {
1997n/a 'hand1' : {
1998n/a 'class' : 'logging.StreamHandler',
1999n/a 'formatter' : 'form1',
2000n/a 'level' : 'NOTSET',
2001n/a 'stream' : 'ext://sys.stdout',
2002n/a },
2003n/a },
2004n/a 'loggers' : {
2005n/a 'compiler.parser' : {
2006n/a 'level' : 'DEBUG',
2007n/a 'handlers' : ['hand1'],
2008n/a },
2009n/a },
2010n/a 'root' : {
2011n/a 'level' : 'WARNING',
2012n/a },
2013n/a }
2014n/a
2015n/a # config1a moves the handler to the root. Used with config8a
2016n/a config1a = {
2017n/a 'version': 1,
2018n/a 'formatters': {
2019n/a 'form1' : {
2020n/a 'format' : '%(levelname)s ++ %(message)s',
2021n/a },
2022n/a },
2023n/a 'handlers' : {
2024n/a 'hand1' : {
2025n/a 'class' : 'logging.StreamHandler',
2026n/a 'formatter' : 'form1',
2027n/a 'level' : 'NOTSET',
2028n/a 'stream' : 'ext://sys.stdout',
2029n/a },
2030n/a },
2031n/a 'loggers' : {
2032n/a 'compiler.parser' : {
2033n/a 'level' : 'DEBUG',
2034n/a },
2035n/a },
2036n/a 'root' : {
2037n/a 'level' : 'WARNING',
2038n/a 'handlers' : ['hand1'],
2039n/a },
2040n/a }
2041n/a
2042n/a # config2 has a subtle configuration error that should be reported
2043n/a config2 = {
2044n/a 'version': 1,
2045n/a 'formatters': {
2046n/a 'form1' : {
2047n/a 'format' : '%(levelname)s ++ %(message)s',
2048n/a },
2049n/a },
2050n/a 'handlers' : {
2051n/a 'hand1' : {
2052n/a 'class' : 'logging.StreamHandler',
2053n/a 'formatter' : 'form1',
2054n/a 'level' : 'NOTSET',
2055n/a 'stream' : 'ext://sys.stdbout',
2056n/a },
2057n/a },
2058n/a 'loggers' : {
2059n/a 'compiler.parser' : {
2060n/a 'level' : 'DEBUG',
2061n/a 'handlers' : ['hand1'],
2062n/a },
2063n/a },
2064n/a 'root' : {
2065n/a 'level' : 'WARNING',
2066n/a },
2067n/a }
2068n/a
2069n/a #As config1 but with a misspelt level on a handler
2070n/a config2a = {
2071n/a 'version': 1,
2072n/a 'formatters': {
2073n/a 'form1' : {
2074n/a 'format' : '%(levelname)s ++ %(message)s',
2075n/a },
2076n/a },
2077n/a 'handlers' : {
2078n/a 'hand1' : {
2079n/a 'class' : 'logging.StreamHandler',
2080n/a 'formatter' : 'form1',
2081n/a 'level' : 'NTOSET',
2082n/a 'stream' : 'ext://sys.stdout',
2083n/a },
2084n/a },
2085n/a 'loggers' : {
2086n/a 'compiler.parser' : {
2087n/a 'level' : 'DEBUG',
2088n/a 'handlers' : ['hand1'],
2089n/a },
2090n/a },
2091n/a 'root' : {
2092n/a 'level' : 'WARNING',
2093n/a },
2094n/a }
2095n/a
2096n/a
2097n/a #As config1 but with a misspelt level on a logger
2098n/a config2b = {
2099n/a 'version': 1,
2100n/a 'formatters': {
2101n/a 'form1' : {
2102n/a 'format' : '%(levelname)s ++ %(message)s',
2103n/a },
2104n/a },
2105n/a 'handlers' : {
2106n/a 'hand1' : {
2107n/a 'class' : 'logging.StreamHandler',
2108n/a 'formatter' : 'form1',
2109n/a 'level' : 'NOTSET',
2110n/a 'stream' : 'ext://sys.stdout',
2111n/a },
2112n/a },
2113n/a 'loggers' : {
2114n/a 'compiler.parser' : {
2115n/a 'level' : 'DEBUG',
2116n/a 'handlers' : ['hand1'],
2117n/a },
2118n/a },
2119n/a 'root' : {
2120n/a 'level' : 'WRANING',
2121n/a },
2122n/a }
2123n/a
2124n/a # config3 has a less subtle configuration error
2125n/a config3 = {
2126n/a 'version': 1,
2127n/a 'formatters': {
2128n/a 'form1' : {
2129n/a 'format' : '%(levelname)s ++ %(message)s',
2130n/a },
2131n/a },
2132n/a 'handlers' : {
2133n/a 'hand1' : {
2134n/a 'class' : 'logging.StreamHandler',
2135n/a 'formatter' : 'misspelled_name',
2136n/a 'level' : 'NOTSET',
2137n/a 'stream' : 'ext://sys.stdout',
2138n/a },
2139n/a },
2140n/a 'loggers' : {
2141n/a 'compiler.parser' : {
2142n/a 'level' : 'DEBUG',
2143n/a 'handlers' : ['hand1'],
2144n/a },
2145n/a },
2146n/a 'root' : {
2147n/a 'level' : 'WARNING',
2148n/a },
2149n/a }
2150n/a
2151n/a # config4 specifies a custom formatter class to be loaded
2152n/a config4 = {
2153n/a 'version': 1,
2154n/a 'formatters': {
2155n/a 'form1' : {
2156n/a '()' : __name__ + '.ExceptionFormatter',
2157n/a 'format' : '%(levelname)s:%(name)s:%(message)s',
2158n/a },
2159n/a },
2160n/a 'handlers' : {
2161n/a 'hand1' : {
2162n/a 'class' : 'logging.StreamHandler',
2163n/a 'formatter' : 'form1',
2164n/a 'level' : 'NOTSET',
2165n/a 'stream' : 'ext://sys.stdout',
2166n/a },
2167n/a },
2168n/a 'root' : {
2169n/a 'level' : 'NOTSET',
2170n/a 'handlers' : ['hand1'],
2171n/a },
2172n/a }
2173n/a
2174n/a # As config4 but using an actual callable rather than a string
2175n/a config4a = {
2176n/a 'version': 1,
2177n/a 'formatters': {
2178n/a 'form1' : {
2179n/a '()' : ExceptionFormatter,
2180n/a 'format' : '%(levelname)s:%(name)s:%(message)s',
2181n/a },
2182n/a 'form2' : {
2183n/a '()' : __name__ + '.formatFunc',
2184n/a 'format' : '%(levelname)s:%(name)s:%(message)s',
2185n/a },
2186n/a 'form3' : {
2187n/a '()' : formatFunc,
2188n/a 'format' : '%(levelname)s:%(name)s:%(message)s',
2189n/a },
2190n/a },
2191n/a 'handlers' : {
2192n/a 'hand1' : {
2193n/a 'class' : 'logging.StreamHandler',
2194n/a 'formatter' : 'form1',
2195n/a 'level' : 'NOTSET',
2196n/a 'stream' : 'ext://sys.stdout',
2197n/a },
2198n/a 'hand2' : {
2199n/a '()' : handlerFunc,
2200n/a },
2201n/a },
2202n/a 'root' : {
2203n/a 'level' : 'NOTSET',
2204n/a 'handlers' : ['hand1'],
2205n/a },
2206n/a }
2207n/a
2208n/a # config5 specifies a custom handler class to be loaded
2209n/a config5 = {
2210n/a 'version': 1,
2211n/a 'formatters': {
2212n/a 'form1' : {
2213n/a 'format' : '%(levelname)s ++ %(message)s',
2214n/a },
2215n/a },
2216n/a 'handlers' : {
2217n/a 'hand1' : {
2218n/a 'class' : __name__ + '.CustomHandler',
2219n/a 'formatter' : 'form1',
2220n/a 'level' : 'NOTSET',
2221n/a 'stream' : 'ext://sys.stdout',
2222n/a },
2223n/a },
2224n/a 'loggers' : {
2225n/a 'compiler.parser' : {
2226n/a 'level' : 'DEBUG',
2227n/a 'handlers' : ['hand1'],
2228n/a },
2229n/a },
2230n/a 'root' : {
2231n/a 'level' : 'WARNING',
2232n/a },
2233n/a }
2234n/a
2235n/a # config6 specifies a custom handler class to be loaded
2236n/a # but has bad arguments
2237n/a config6 = {
2238n/a 'version': 1,
2239n/a 'formatters': {
2240n/a 'form1' : {
2241n/a 'format' : '%(levelname)s ++ %(message)s',
2242n/a },
2243n/a },
2244n/a 'handlers' : {
2245n/a 'hand1' : {
2246n/a 'class' : __name__ + '.CustomHandler',
2247n/a 'formatter' : 'form1',
2248n/a 'level' : 'NOTSET',
2249n/a 'stream' : 'ext://sys.stdout',
2250n/a '9' : 'invalid parameter name',
2251n/a },
2252n/a },
2253n/a 'loggers' : {
2254n/a 'compiler.parser' : {
2255n/a 'level' : 'DEBUG',
2256n/a 'handlers' : ['hand1'],
2257n/a },
2258n/a },
2259n/a 'root' : {
2260n/a 'level' : 'WARNING',
2261n/a },
2262n/a }
2263n/a
2264n/a #config 7 does not define compiler.parser but defines compiler.lexer
2265n/a #so compiler.parser should be disabled after applying it
2266n/a config7 = {
2267n/a 'version': 1,
2268n/a 'formatters': {
2269n/a 'form1' : {
2270n/a 'format' : '%(levelname)s ++ %(message)s',
2271n/a },
2272n/a },
2273n/a 'handlers' : {
2274n/a 'hand1' : {
2275n/a 'class' : 'logging.StreamHandler',
2276n/a 'formatter' : 'form1',
2277n/a 'level' : 'NOTSET',
2278n/a 'stream' : 'ext://sys.stdout',
2279n/a },
2280n/a },
2281n/a 'loggers' : {
2282n/a 'compiler.lexer' : {
2283n/a 'level' : 'DEBUG',
2284n/a 'handlers' : ['hand1'],
2285n/a },
2286n/a },
2287n/a 'root' : {
2288n/a 'level' : 'WARNING',
2289n/a },
2290n/a }
2291n/a
2292n/a # config8 defines both compiler and compiler.lexer
2293n/a # so compiler.parser should not be disabled (since
2294n/a # compiler is defined)
2295n/a config8 = {
2296n/a 'version': 1,
2297n/a 'disable_existing_loggers' : False,
2298n/a 'formatters': {
2299n/a 'form1' : {
2300n/a 'format' : '%(levelname)s ++ %(message)s',
2301n/a },
2302n/a },
2303n/a 'handlers' : {
2304n/a 'hand1' : {
2305n/a 'class' : 'logging.StreamHandler',
2306n/a 'formatter' : 'form1',
2307n/a 'level' : 'NOTSET',
2308n/a 'stream' : 'ext://sys.stdout',
2309n/a },
2310n/a },
2311n/a 'loggers' : {
2312n/a 'compiler' : {
2313n/a 'level' : 'DEBUG',
2314n/a 'handlers' : ['hand1'],
2315n/a },
2316n/a 'compiler.lexer' : {
2317n/a },
2318n/a },
2319n/a 'root' : {
2320n/a 'level' : 'WARNING',
2321n/a },
2322n/a }
2323n/a
2324n/a # config8a disables existing loggers
2325n/a config8a = {
2326n/a 'version': 1,
2327n/a 'disable_existing_loggers' : True,
2328n/a 'formatters': {
2329n/a 'form1' : {
2330n/a 'format' : '%(levelname)s ++ %(message)s',
2331n/a },
2332n/a },
2333n/a 'handlers' : {
2334n/a 'hand1' : {
2335n/a 'class' : 'logging.StreamHandler',
2336n/a 'formatter' : 'form1',
2337n/a 'level' : 'NOTSET',
2338n/a 'stream' : 'ext://sys.stdout',
2339n/a },
2340n/a },
2341n/a 'loggers' : {
2342n/a 'compiler' : {
2343n/a 'level' : 'DEBUG',
2344n/a 'handlers' : ['hand1'],
2345n/a },
2346n/a 'compiler.lexer' : {
2347n/a },
2348n/a },
2349n/a 'root' : {
2350n/a 'level' : 'WARNING',
2351n/a },
2352n/a }
2353n/a
2354n/a config9 = {
2355n/a 'version': 1,
2356n/a 'formatters': {
2357n/a 'form1' : {
2358n/a 'format' : '%(levelname)s ++ %(message)s',
2359n/a },
2360n/a },
2361n/a 'handlers' : {
2362n/a 'hand1' : {
2363n/a 'class' : 'logging.StreamHandler',
2364n/a 'formatter' : 'form1',
2365n/a 'level' : 'WARNING',
2366n/a 'stream' : 'ext://sys.stdout',
2367n/a },
2368n/a },
2369n/a 'loggers' : {
2370n/a 'compiler.parser' : {
2371n/a 'level' : 'WARNING',
2372n/a 'handlers' : ['hand1'],
2373n/a },
2374n/a },
2375n/a 'root' : {
2376n/a 'level' : 'NOTSET',
2377n/a },
2378n/a }
2379n/a
2380n/a config9a = {
2381n/a 'version': 1,
2382n/a 'incremental' : True,
2383n/a 'handlers' : {
2384n/a 'hand1' : {
2385n/a 'level' : 'WARNING',
2386n/a },
2387n/a },
2388n/a 'loggers' : {
2389n/a 'compiler.parser' : {
2390n/a 'level' : 'INFO',
2391n/a },
2392n/a },
2393n/a }
2394n/a
2395n/a config9b = {
2396n/a 'version': 1,
2397n/a 'incremental' : True,
2398n/a 'handlers' : {
2399n/a 'hand1' : {
2400n/a 'level' : 'INFO',
2401n/a },
2402n/a },
2403n/a 'loggers' : {
2404n/a 'compiler.parser' : {
2405n/a 'level' : 'INFO',
2406n/a },
2407n/a },
2408n/a }
2409n/a
2410n/a #As config1 but with a filter added
2411n/a config10 = {
2412n/a 'version': 1,
2413n/a 'formatters': {
2414n/a 'form1' : {
2415n/a 'format' : '%(levelname)s ++ %(message)s',
2416n/a },
2417n/a },
2418n/a 'filters' : {
2419n/a 'filt1' : {
2420n/a 'name' : 'compiler.parser',
2421n/a },
2422n/a },
2423n/a 'handlers' : {
2424n/a 'hand1' : {
2425n/a 'class' : 'logging.StreamHandler',
2426n/a 'formatter' : 'form1',
2427n/a 'level' : 'NOTSET',
2428n/a 'stream' : 'ext://sys.stdout',
2429n/a 'filters' : ['filt1'],
2430n/a },
2431n/a },
2432n/a 'loggers' : {
2433n/a 'compiler.parser' : {
2434n/a 'level' : 'DEBUG',
2435n/a 'filters' : ['filt1'],
2436n/a },
2437n/a },
2438n/a 'root' : {
2439n/a 'level' : 'WARNING',
2440n/a 'handlers' : ['hand1'],
2441n/a },
2442n/a }
2443n/a
2444n/a #As config1 but using cfg:// references
2445n/a config11 = {
2446n/a 'version': 1,
2447n/a 'true_formatters': {
2448n/a 'form1' : {
2449n/a 'format' : '%(levelname)s ++ %(message)s',
2450n/a },
2451n/a },
2452n/a 'handler_configs': {
2453n/a 'hand1' : {
2454n/a 'class' : 'logging.StreamHandler',
2455n/a 'formatter' : 'form1',
2456n/a 'level' : 'NOTSET',
2457n/a 'stream' : 'ext://sys.stdout',
2458n/a },
2459n/a },
2460n/a 'formatters' : 'cfg://true_formatters',
2461n/a 'handlers' : {
2462n/a 'hand1' : 'cfg://handler_configs[hand1]',
2463n/a },
2464n/a 'loggers' : {
2465n/a 'compiler.parser' : {
2466n/a 'level' : 'DEBUG',
2467n/a 'handlers' : ['hand1'],
2468n/a },
2469n/a },
2470n/a 'root' : {
2471n/a 'level' : 'WARNING',
2472n/a },
2473n/a }
2474n/a
2475n/a #As config11 but missing the version key
2476n/a config12 = {
2477n/a 'true_formatters': {
2478n/a 'form1' : {
2479n/a 'format' : '%(levelname)s ++ %(message)s',
2480n/a },
2481n/a },
2482n/a 'handler_configs': {
2483n/a 'hand1' : {
2484n/a 'class' : 'logging.StreamHandler',
2485n/a 'formatter' : 'form1',
2486n/a 'level' : 'NOTSET',
2487n/a 'stream' : 'ext://sys.stdout',
2488n/a },
2489n/a },
2490n/a 'formatters' : 'cfg://true_formatters',
2491n/a 'handlers' : {
2492n/a 'hand1' : 'cfg://handler_configs[hand1]',
2493n/a },
2494n/a 'loggers' : {
2495n/a 'compiler.parser' : {
2496n/a 'level' : 'DEBUG',
2497n/a 'handlers' : ['hand1'],
2498n/a },
2499n/a },
2500n/a 'root' : {
2501n/a 'level' : 'WARNING',
2502n/a },
2503n/a }
2504n/a
2505n/a #As config11 but using an unsupported version
2506n/a config13 = {
2507n/a 'version': 2,
2508n/a 'true_formatters': {
2509n/a 'form1' : {
2510n/a 'format' : '%(levelname)s ++ %(message)s',
2511n/a },
2512n/a },
2513n/a 'handler_configs': {
2514n/a 'hand1' : {
2515n/a 'class' : 'logging.StreamHandler',
2516n/a 'formatter' : 'form1',
2517n/a 'level' : 'NOTSET',
2518n/a 'stream' : 'ext://sys.stdout',
2519n/a },
2520n/a },
2521n/a 'formatters' : 'cfg://true_formatters',
2522n/a 'handlers' : {
2523n/a 'hand1' : 'cfg://handler_configs[hand1]',
2524n/a },
2525n/a 'loggers' : {
2526n/a 'compiler.parser' : {
2527n/a 'level' : 'DEBUG',
2528n/a 'handlers' : ['hand1'],
2529n/a },
2530n/a },
2531n/a 'root' : {
2532n/a 'level' : 'WARNING',
2533n/a },
2534n/a }
2535n/a
2536n/a # As config0, but with properties
2537n/a config14 = {
2538n/a 'version': 1,
2539n/a 'formatters': {
2540n/a 'form1' : {
2541n/a 'format' : '%(levelname)s ++ %(message)s',
2542n/a },
2543n/a },
2544n/a 'handlers' : {
2545n/a 'hand1' : {
2546n/a 'class' : 'logging.StreamHandler',
2547n/a 'formatter' : 'form1',
2548n/a 'level' : 'NOTSET',
2549n/a 'stream' : 'ext://sys.stdout',
2550n/a '.': {
2551n/a 'foo': 'bar',
2552n/a 'terminator': '!\n',
2553n/a }
2554n/a },
2555n/a },
2556n/a 'root' : {
2557n/a 'level' : 'WARNING',
2558n/a 'handlers' : ['hand1'],
2559n/a },
2560n/a }
2561n/a
2562n/a out_of_order = {
2563n/a "version": 1,
2564n/a "formatters": {
2565n/a "mySimpleFormatter": {
2566n/a "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2567n/a "style": "$"
2568n/a }
2569n/a },
2570n/a "handlers": {
2571n/a "fileGlobal": {
2572n/a "class": "logging.StreamHandler",
2573n/a "level": "DEBUG",
2574n/a "formatter": "mySimpleFormatter"
2575n/a },
2576n/a "bufferGlobal": {
2577n/a "class": "logging.handlers.MemoryHandler",
2578n/a "capacity": 5,
2579n/a "formatter": "mySimpleFormatter",
2580n/a "target": "fileGlobal",
2581n/a "level": "DEBUG"
2582n/a }
2583n/a },
2584n/a "loggers": {
2585n/a "mymodule": {
2586n/a "level": "DEBUG",
2587n/a "handlers": ["bufferGlobal"],
2588n/a "propagate": "true"
2589n/a }
2590n/a }
2591n/a }
2592n/a
2593n/a def apply_config(self, conf):
2594n/a logging.config.dictConfig(conf)
2595n/a
2596n/a def test_config0_ok(self):
2597n/a # A simple config which overrides the default settings.
2598n/a with support.captured_stdout() as output:
2599n/a self.apply_config(self.config0)
2600n/a logger = logging.getLogger()
2601n/a # Won't output anything
2602n/a logger.info(self.next_message())
2603n/a # Outputs a message
2604n/a logger.error(self.next_message())
2605n/a self.assert_log_lines([
2606n/a ('ERROR', '2'),
2607n/a ], stream=output)
2608n/a # Original logger output is empty.
2609n/a self.assert_log_lines([])
2610n/a
2611n/a def test_config1_ok(self, config=config1):
2612n/a # A config defining a sub-parser as well.
2613n/a with support.captured_stdout() as output:
2614n/a self.apply_config(config)
2615n/a logger = logging.getLogger("compiler.parser")
2616n/a # Both will output a message
2617n/a logger.info(self.next_message())
2618n/a logger.error(self.next_message())
2619n/a self.assert_log_lines([
2620n/a ('INFO', '1'),
2621n/a ('ERROR', '2'),
2622n/a ], stream=output)
2623n/a # Original logger output is empty.
2624n/a self.assert_log_lines([])
2625n/a
2626n/a def test_config2_failure(self):
2627n/a # A simple config which overrides the default settings.
2628n/a self.assertRaises(Exception, self.apply_config, self.config2)
2629n/a
2630n/a def test_config2a_failure(self):
2631n/a # A simple config which overrides the default settings.
2632n/a self.assertRaises(Exception, self.apply_config, self.config2a)
2633n/a
2634n/a def test_config2b_failure(self):
2635n/a # A simple config which overrides the default settings.
2636n/a self.assertRaises(Exception, self.apply_config, self.config2b)
2637n/a
2638n/a def test_config3_failure(self):
2639n/a # A simple config which overrides the default settings.
2640n/a self.assertRaises(Exception, self.apply_config, self.config3)
2641n/a
2642n/a def test_config4_ok(self):
2643n/a # A config specifying a custom formatter class.
2644n/a with support.captured_stdout() as output:
2645n/a self.apply_config(self.config4)
2646n/a #logger = logging.getLogger()
2647n/a try:
2648n/a raise RuntimeError()
2649n/a except RuntimeError:
2650n/a logging.exception("just testing")
2651n/a sys.stdout.seek(0)
2652n/a self.assertEqual(output.getvalue(),
2653n/a "ERROR:root:just testing\nGot a [RuntimeError]\n")
2654n/a # Original logger output is empty
2655n/a self.assert_log_lines([])
2656n/a
2657n/a def test_config4a_ok(self):
2658n/a # A config specifying a custom formatter class.
2659n/a with support.captured_stdout() as output:
2660n/a self.apply_config(self.config4a)
2661n/a #logger = logging.getLogger()
2662n/a try:
2663n/a raise RuntimeError()
2664n/a except RuntimeError:
2665n/a logging.exception("just testing")
2666n/a sys.stdout.seek(0)
2667n/a self.assertEqual(output.getvalue(),
2668n/a "ERROR:root:just testing\nGot a [RuntimeError]\n")
2669n/a # Original logger output is empty
2670n/a self.assert_log_lines([])
2671n/a
2672n/a def test_config5_ok(self):
2673n/a self.test_config1_ok(config=self.config5)
2674n/a
2675n/a def test_config6_failure(self):
2676n/a self.assertRaises(Exception, self.apply_config, self.config6)
2677n/a
2678n/a def test_config7_ok(self):
2679n/a with support.captured_stdout() as output:
2680n/a self.apply_config(self.config1)
2681n/a logger = logging.getLogger("compiler.parser")
2682n/a # Both will output a message
2683n/a logger.info(self.next_message())
2684n/a logger.error(self.next_message())
2685n/a self.assert_log_lines([
2686n/a ('INFO', '1'),
2687n/a ('ERROR', '2'),
2688n/a ], stream=output)
2689n/a # Original logger output is empty.
2690n/a self.assert_log_lines([])
2691n/a with support.captured_stdout() as output:
2692n/a self.apply_config(self.config7)
2693n/a logger = logging.getLogger("compiler.parser")
2694n/a self.assertTrue(logger.disabled)
2695n/a logger = logging.getLogger("compiler.lexer")
2696n/a # Both will output a message
2697n/a logger.info(self.next_message())
2698n/a logger.error(self.next_message())
2699n/a self.assert_log_lines([
2700n/a ('INFO', '3'),
2701n/a ('ERROR', '4'),
2702n/a ], stream=output)
2703n/a # Original logger output is empty.
2704n/a self.assert_log_lines([])
2705n/a
2706n/a #Same as test_config_7_ok but don't disable old loggers.
2707n/a def test_config_8_ok(self):
2708n/a with support.captured_stdout() as output:
2709n/a self.apply_config(self.config1)
2710n/a logger = logging.getLogger("compiler.parser")
2711n/a # All will output a message
2712n/a logger.info(self.next_message())
2713n/a logger.error(self.next_message())
2714n/a self.assert_log_lines([
2715n/a ('INFO', '1'),
2716n/a ('ERROR', '2'),
2717n/a ], stream=output)
2718n/a # Original logger output is empty.
2719n/a self.assert_log_lines([])
2720n/a with support.captured_stdout() as output:
2721n/a self.apply_config(self.config8)
2722n/a logger = logging.getLogger("compiler.parser")
2723n/a self.assertFalse(logger.disabled)
2724n/a # Both will output a message
2725n/a logger.info(self.next_message())
2726n/a logger.error(self.next_message())
2727n/a logger = logging.getLogger("compiler.lexer")
2728n/a # Both will output a message
2729n/a logger.info(self.next_message())
2730n/a logger.error(self.next_message())
2731n/a self.assert_log_lines([
2732n/a ('INFO', '3'),
2733n/a ('ERROR', '4'),
2734n/a ('INFO', '5'),
2735n/a ('ERROR', '6'),
2736n/a ], stream=output)
2737n/a # Original logger output is empty.
2738n/a self.assert_log_lines([])
2739n/a
2740n/a def test_config_8a_ok(self):
2741n/a with support.captured_stdout() as output:
2742n/a self.apply_config(self.config1a)
2743n/a logger = logging.getLogger("compiler.parser")
2744n/a # See issue #11424. compiler-hyphenated sorts
2745n/a # between compiler and compiler.xyz and this
2746n/a # was preventing compiler.xyz from being included
2747n/a # in the child loggers of compiler because of an
2748n/a # overzealous loop termination condition.
2749n/a hyphenated = logging.getLogger('compiler-hyphenated')
2750n/a # All will output a message
2751n/a logger.info(self.next_message())
2752n/a logger.error(self.next_message())
2753n/a hyphenated.critical(self.next_message())
2754n/a self.assert_log_lines([
2755n/a ('INFO', '1'),
2756n/a ('ERROR', '2'),
2757n/a ('CRITICAL', '3'),
2758n/a ], stream=output)
2759n/a # Original logger output is empty.
2760n/a self.assert_log_lines([])
2761n/a with support.captured_stdout() as output:
2762n/a self.apply_config(self.config8a)
2763n/a logger = logging.getLogger("compiler.parser")
2764n/a self.assertFalse(logger.disabled)
2765n/a # Both will output a message
2766n/a logger.info(self.next_message())
2767n/a logger.error(self.next_message())
2768n/a logger = logging.getLogger("compiler.lexer")
2769n/a # Both will output a message
2770n/a logger.info(self.next_message())
2771n/a logger.error(self.next_message())
2772n/a # Will not appear
2773n/a hyphenated.critical(self.next_message())
2774n/a self.assert_log_lines([
2775n/a ('INFO', '4'),
2776n/a ('ERROR', '5'),
2777n/a ('INFO', '6'),
2778n/a ('ERROR', '7'),
2779n/a ], stream=output)
2780n/a # Original logger output is empty.
2781n/a self.assert_log_lines([])
2782n/a
2783n/a def test_config_9_ok(self):
2784n/a with support.captured_stdout() as output:
2785n/a self.apply_config(self.config9)
2786n/a logger = logging.getLogger("compiler.parser")
2787n/a #Nothing will be output since both handler and logger are set to WARNING
2788n/a logger.info(self.next_message())
2789n/a self.assert_log_lines([], stream=output)
2790n/a self.apply_config(self.config9a)
2791n/a #Nothing will be output since both handler is still set to WARNING
2792n/a logger.info(self.next_message())
2793n/a self.assert_log_lines([], stream=output)
2794n/a self.apply_config(self.config9b)
2795n/a #Message should now be output
2796n/a logger.info(self.next_message())
2797n/a self.assert_log_lines([
2798n/a ('INFO', '3'),
2799n/a ], stream=output)
2800n/a
2801n/a def test_config_10_ok(self):
2802n/a with support.captured_stdout() as output:
2803n/a self.apply_config(self.config10)
2804n/a logger = logging.getLogger("compiler.parser")
2805n/a logger.warning(self.next_message())
2806n/a logger = logging.getLogger('compiler')
2807n/a #Not output, because filtered
2808n/a logger.warning(self.next_message())
2809n/a logger = logging.getLogger('compiler.lexer')
2810n/a #Not output, because filtered
2811n/a logger.warning(self.next_message())
2812n/a logger = logging.getLogger("compiler.parser.codegen")
2813n/a #Output, as not filtered
2814n/a logger.error(self.next_message())
2815n/a self.assert_log_lines([
2816n/a ('WARNING', '1'),
2817n/a ('ERROR', '4'),
2818n/a ], stream=output)
2819n/a
2820n/a def test_config11_ok(self):
2821n/a self.test_config1_ok(self.config11)
2822n/a
2823n/a def test_config12_failure(self):
2824n/a self.assertRaises(Exception, self.apply_config, self.config12)
2825n/a
2826n/a def test_config13_failure(self):
2827n/a self.assertRaises(Exception, self.apply_config, self.config13)
2828n/a
2829n/a def test_config14_ok(self):
2830n/a with support.captured_stdout() as output:
2831n/a self.apply_config(self.config14)
2832n/a h = logging._handlers['hand1']
2833n/a self.assertEqual(h.foo, 'bar')
2834n/a self.assertEqual(h.terminator, '!\n')
2835n/a logging.warning('Exclamation')
2836n/a self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
2837n/a
2838n/a @unittest.skipUnless(threading, 'listen() needs threading to work')
2839n/a def setup_via_listener(self, text, verify=None):
2840n/a text = text.encode("utf-8")
2841n/a # Ask for a randomly assigned port (by using port 0)
2842n/a t = logging.config.listen(0, verify)
2843n/a t.start()
2844n/a t.ready.wait()
2845n/a # Now get the port allocated
2846n/a port = t.port
2847n/a t.ready.clear()
2848n/a try:
2849n/a sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2850n/a sock.settimeout(2.0)
2851n/a sock.connect(('localhost', port))
2852n/a
2853n/a slen = struct.pack('>L', len(text))
2854n/a s = slen + text
2855n/a sentsofar = 0
2856n/a left = len(s)
2857n/a while left > 0:
2858n/a sent = sock.send(s[sentsofar:])
2859n/a sentsofar += sent
2860n/a left -= sent
2861n/a sock.close()
2862n/a finally:
2863n/a t.ready.wait(2.0)
2864n/a logging.config.stopListening()
2865n/a t.join(2.0)
2866n/a
2867n/a @unittest.skipUnless(threading, 'Threading required for this test.')
2868n/a def test_listen_config_10_ok(self):
2869n/a with support.captured_stdout() as output:
2870n/a self.setup_via_listener(json.dumps(self.config10))
2871n/a logger = logging.getLogger("compiler.parser")
2872n/a logger.warning(self.next_message())
2873n/a logger = logging.getLogger('compiler')
2874n/a #Not output, because filtered
2875n/a logger.warning(self.next_message())
2876n/a logger = logging.getLogger('compiler.lexer')
2877n/a #Not output, because filtered
2878n/a logger.warning(self.next_message())
2879n/a logger = logging.getLogger("compiler.parser.codegen")
2880n/a #Output, as not filtered
2881n/a logger.error(self.next_message())
2882n/a self.assert_log_lines([
2883n/a ('WARNING', '1'),
2884n/a ('ERROR', '4'),
2885n/a ], stream=output)
2886n/a
2887n/a @unittest.skipUnless(threading, 'Threading required for this test.')
2888n/a def test_listen_config_1_ok(self):
2889n/a with support.captured_stdout() as output:
2890n/a self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
2891n/a logger = logging.getLogger("compiler.parser")
2892n/a # Both will output a message
2893n/a logger.info(self.next_message())
2894n/a logger.error(self.next_message())
2895n/a self.assert_log_lines([
2896n/a ('INFO', '1'),
2897n/a ('ERROR', '2'),
2898n/a ], stream=output)
2899n/a # Original logger output is empty.
2900n/a self.assert_log_lines([])
2901n/a
2902n/a @unittest.skipUnless(threading, 'Threading required for this test.')
2903n/a def test_listen_verify(self):
2904n/a
2905n/a def verify_fail(stuff):
2906n/a return None
2907n/a
2908n/a def verify_reverse(stuff):
2909n/a return stuff[::-1]
2910n/a
2911n/a logger = logging.getLogger("compiler.parser")
2912n/a to_send = textwrap.dedent(ConfigFileTest.config1)
2913n/a # First, specify a verification function that will fail.
2914n/a # We expect to see no output, since our configuration
2915n/a # never took effect.
2916n/a with support.captured_stdout() as output:
2917n/a self.setup_via_listener(to_send, verify_fail)
2918n/a # Both will output a message
2919n/a logger.info(self.next_message())
2920n/a logger.error(self.next_message())
2921n/a self.assert_log_lines([], stream=output)
2922n/a # Original logger output has the stuff we logged.
2923n/a self.assert_log_lines([
2924n/a ('INFO', '1'),
2925n/a ('ERROR', '2'),
2926n/a ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
2927n/a
2928n/a # Now, perform no verification. Our configuration
2929n/a # should take effect.
2930n/a
2931n/a with support.captured_stdout() as output:
2932n/a self.setup_via_listener(to_send) # no verify callable specified
2933n/a logger = logging.getLogger("compiler.parser")
2934n/a # Both will output a message
2935n/a logger.info(self.next_message())
2936n/a logger.error(self.next_message())
2937n/a self.assert_log_lines([
2938n/a ('INFO', '3'),
2939n/a ('ERROR', '4'),
2940n/a ], stream=output)
2941n/a # Original logger output still has the stuff we logged before.
2942n/a self.assert_log_lines([
2943n/a ('INFO', '1'),
2944n/a ('ERROR', '2'),
2945n/a ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
2946n/a
2947n/a # Now, perform verification which transforms the bytes.
2948n/a
2949n/a with support.captured_stdout() as output:
2950n/a self.setup_via_listener(to_send[::-1], verify_reverse)
2951n/a logger = logging.getLogger("compiler.parser")
2952n/a # Both will output a message
2953n/a logger.info(self.next_message())
2954n/a logger.error(self.next_message())
2955n/a self.assert_log_lines([
2956n/a ('INFO', '5'),
2957n/a ('ERROR', '6'),
2958n/a ], stream=output)
2959n/a # Original logger output still has the stuff we logged before.
2960n/a self.assert_log_lines([
2961n/a ('INFO', '1'),
2962n/a ('ERROR', '2'),
2963n/a ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
2964n/a
2965n/a def test_out_of_order(self):
2966n/a self.apply_config(self.out_of_order)
2967n/a handler = logging.getLogger('mymodule').handlers[0]
2968n/a self.assertIsInstance(handler.target, logging.Handler)
2969n/a self.assertIsInstance(handler.formatter._style,
2970n/a logging.StringTemplateStyle)
2971n/a
2972n/a def test_baseconfig(self):
2973n/a d = {
2974n/a 'atuple': (1, 2, 3),
2975n/a 'alist': ['a', 'b', 'c'],
2976n/a 'adict': {'d': 'e', 'f': 3 },
2977n/a 'nest1': ('g', ('h', 'i'), 'j'),
2978n/a 'nest2': ['k', ['l', 'm'], 'n'],
2979n/a 'nest3': ['o', 'cfg://alist', 'p'],
2980n/a }
2981n/a bc = logging.config.BaseConfigurator(d)
2982n/a self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
2983n/a self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
2984n/a self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
2985n/a self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
2986n/a self.assertEqual(bc.convert('cfg://adict.d'), 'e')
2987n/a self.assertEqual(bc.convert('cfg://adict[f]'), 3)
2988n/a v = bc.convert('cfg://nest3')
2989n/a self.assertEqual(v.pop(1), ['a', 'b', 'c'])
2990n/a self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
2991n/a self.assertRaises(ValueError, bc.convert, 'cfg://!')
2992n/a self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
2993n/a
2994n/aclass ManagerTest(BaseTest):
2995n/a def test_manager_loggerclass(self):
2996n/a logged = []
2997n/a
2998n/a class MyLogger(logging.Logger):
2999n/a def _log(self, level, msg, args, exc_info=None, extra=None):
3000n/a logged.append(msg)
3001n/a
3002n/a man = logging.Manager(None)
3003n/a self.assertRaises(TypeError, man.setLoggerClass, int)
3004n/a man.setLoggerClass(MyLogger)
3005n/a logger = man.getLogger('test')
3006n/a logger.warning('should appear in logged')
3007n/a logging.warning('should not appear in logged')
3008n/a
3009n/a self.assertEqual(logged, ['should appear in logged'])
3010n/a
3011n/a def test_set_log_record_factory(self):
3012n/a man = logging.Manager(None)
3013n/a expected = object()
3014n/a man.setLogRecordFactory(expected)
3015n/a self.assertEqual(man.logRecordFactory, expected)
3016n/a
3017n/aclass ChildLoggerTest(BaseTest):
3018n/a def test_child_loggers(self):
3019n/a r = logging.getLogger()
3020n/a l1 = logging.getLogger('abc')
3021n/a l2 = logging.getLogger('def.ghi')
3022n/a c1 = r.getChild('xyz')
3023n/a c2 = r.getChild('uvw.xyz')
3024n/a self.assertIs(c1, logging.getLogger('xyz'))
3025n/a self.assertIs(c2, logging.getLogger('uvw.xyz'))
3026n/a c1 = l1.getChild('def')
3027n/a c2 = c1.getChild('ghi')
3028n/a c3 = l1.getChild('def.ghi')
3029n/a self.assertIs(c1, logging.getLogger('abc.def'))
3030n/a self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3031n/a self.assertIs(c2, c3)
3032n/a
3033n/a
3034n/aclass DerivedLogRecord(logging.LogRecord):
3035n/a pass
3036n/a
3037n/aclass LogRecordFactoryTest(BaseTest):
3038n/a
3039n/a def setUp(self):
3040n/a class CheckingFilter(logging.Filter):
3041n/a def __init__(self, cls):
3042n/a self.cls = cls
3043n/a
3044n/a def filter(self, record):
3045n/a t = type(record)
3046n/a if t is not self.cls:
3047n/a msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3048n/a self.cls)
3049n/a raise TypeError(msg)
3050n/a return True
3051n/a
3052n/a BaseTest.setUp(self)
3053n/a self.filter = CheckingFilter(DerivedLogRecord)
3054n/a self.root_logger.addFilter(self.filter)
3055n/a self.orig_factory = logging.getLogRecordFactory()
3056n/a
3057n/a def tearDown(self):
3058n/a self.root_logger.removeFilter(self.filter)
3059n/a BaseTest.tearDown(self)
3060n/a logging.setLogRecordFactory(self.orig_factory)
3061n/a
3062n/a def test_logrecord_class(self):
3063n/a self.assertRaises(TypeError, self.root_logger.warning,
3064n/a self.next_message())
3065n/a logging.setLogRecordFactory(DerivedLogRecord)
3066n/a self.root_logger.error(self.next_message())
3067n/a self.assert_log_lines([
3068n/a ('root', 'ERROR', '2'),
3069n/a ])
3070n/a
3071n/a
3072n/aclass QueueHandlerTest(BaseTest):
3073n/a # Do not bother with a logger name group.
3074n/a expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3075n/a
3076n/a def setUp(self):
3077n/a BaseTest.setUp(self)
3078n/a self.queue = queue.Queue(-1)
3079n/a self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3080n/a self.que_logger = logging.getLogger('que')
3081n/a self.que_logger.propagate = False
3082n/a self.que_logger.setLevel(logging.WARNING)
3083n/a self.que_logger.addHandler(self.que_hdlr)
3084n/a
3085n/a def tearDown(self):
3086n/a self.que_hdlr.close()
3087n/a BaseTest.tearDown(self)
3088n/a
3089n/a def test_queue_handler(self):
3090n/a self.que_logger.debug(self.next_message())
3091n/a self.assertRaises(queue.Empty, self.queue.get_nowait)
3092n/a self.que_logger.info(self.next_message())
3093n/a self.assertRaises(queue.Empty, self.queue.get_nowait)
3094n/a msg = self.next_message()
3095n/a self.que_logger.warning(msg)
3096n/a data = self.queue.get_nowait()
3097n/a self.assertTrue(isinstance(data, logging.LogRecord))
3098n/a self.assertEqual(data.name, self.que_logger.name)
3099n/a self.assertEqual((data.msg, data.args), (msg, None))
3100n/a
3101n/a @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3102n/a 'logging.handlers.QueueListener required for this test')
3103n/a def test_queue_listener(self):
3104n/a handler = support.TestHandler(support.Matcher())
3105n/a listener = logging.handlers.QueueListener(self.queue, handler)
3106n/a listener.start()
3107n/a try:
3108n/a self.que_logger.warning(self.next_message())
3109n/a self.que_logger.error(self.next_message())
3110n/a self.que_logger.critical(self.next_message())
3111n/a finally:
3112n/a listener.stop()
3113n/a self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3114n/a self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3115n/a self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3116n/a handler.close()
3117n/a
3118n/a # Now test with respect_handler_level set
3119n/a
3120n/a handler = support.TestHandler(support.Matcher())
3121n/a handler.setLevel(logging.CRITICAL)
3122n/a listener = logging.handlers.QueueListener(self.queue, handler,
3123n/a respect_handler_level=True)
3124n/a listener.start()
3125n/a try:
3126n/a self.que_logger.warning(self.next_message())
3127n/a self.que_logger.error(self.next_message())
3128n/a self.que_logger.critical(self.next_message())
3129n/a finally:
3130n/a listener.stop()
3131n/a self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3132n/a self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3133n/a self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3134n/a
3135n/aif hasattr(logging.handlers, 'QueueListener'):
3136n/a import multiprocessing
3137n/a from unittest.mock import patch
3138n/a
3139n/a class QueueListenerTest(BaseTest):
3140n/a """
3141n/a Tests based on patch submitted for issue #27930. Ensure that
3142n/a QueueListener handles all log messages.
3143n/a """
3144n/a
3145n/a repeat = 20
3146n/a
3147n/a @staticmethod
3148n/a def setup_and_log(log_queue, ident):
3149n/a """
3150n/a Creates a logger with a QueueHandler that logs to a queue read by a
3151n/a QueueListener. Starts the listener, logs five messages, and stops
3152n/a the listener.
3153n/a """
3154n/a logger = logging.getLogger('test_logger_with_id_%s' % ident)
3155n/a logger.setLevel(logging.DEBUG)
3156n/a handler = logging.handlers.QueueHandler(log_queue)
3157n/a logger.addHandler(handler)
3158n/a listener = logging.handlers.QueueListener(log_queue)
3159n/a listener.start()
3160n/a
3161n/a logger.info('one')
3162n/a logger.info('two')
3163n/a logger.info('three')
3164n/a logger.info('four')
3165n/a logger.info('five')
3166n/a
3167n/a listener.stop()
3168n/a logger.removeHandler(handler)
3169n/a handler.close()
3170n/a
3171n/a @patch.object(logging.handlers.QueueListener, 'handle')
3172n/a def test_handle_called_with_queue_queue(self, mock_handle):
3173n/a for i in range(self.repeat):
3174n/a log_queue = queue.Queue()
3175n/a self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3176n/a self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3177n/a 'correct number of handled log messages')
3178n/a
3179n/a @support.requires_multiprocessing_queue
3180n/a @patch.object(logging.handlers.QueueListener, 'handle')
3181n/a def test_handle_called_with_mp_queue(self, mock_handle):
3182n/a for i in range(self.repeat):
3183n/a log_queue = multiprocessing.Queue()
3184n/a self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3185n/a self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3186n/a 'correct number of handled log messages')
3187n/a
3188n/a @staticmethod
3189n/a def get_all_from_queue(log_queue):
3190n/a try:
3191n/a while True:
3192n/a yield log_queue.get_nowait()
3193n/a except queue.Empty:
3194n/a return []
3195n/a
3196n/a @support.requires_multiprocessing_queue
3197n/a def test_no_messages_in_queue_after_stop(self):
3198n/a """
3199n/a Five messages are logged then the QueueListener is stopped. This
3200n/a test then gets everything off the queue. Failure of this test
3201n/a indicates that messages were not registered on the queue until
3202n/a _after_ the QueueListener stopped.
3203n/a """
3204n/a for i in range(self.repeat):
3205n/a queue = multiprocessing.Queue()
3206n/a self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3207n/a # time.sleep(1)
3208n/a items = list(self.get_all_from_queue(queue))
3209n/a expected = [[], [logging.handlers.QueueListener._sentinel]]
3210n/a self.assertIn(items, expected,
3211n/a 'Found unexpected messages in queue: %s' % (
3212n/a [m.msg if isinstance(m, logging.LogRecord)
3213n/a else m for m in items]))
3214n/a
3215n/a
3216n/aZERO = datetime.timedelta(0)
3217n/a
3218n/aclass UTC(datetime.tzinfo):
3219n/a def utcoffset(self, dt):
3220n/a return ZERO
3221n/a
3222n/a dst = utcoffset
3223n/a
3224n/a def tzname(self, dt):
3225n/a return 'UTC'
3226n/a
3227n/autc = UTC()
3228n/a
3229n/aclass FormatterTest(unittest.TestCase):
3230n/a def setUp(self):
3231n/a self.common = {
3232n/a 'name': 'formatter.test',
3233n/a 'level': logging.DEBUG,
3234n/a 'pathname': os.path.join('path', 'to', 'dummy.ext'),
3235n/a 'lineno': 42,
3236n/a 'exc_info': None,
3237n/a 'func': None,
3238n/a 'msg': 'Message with %d %s',
3239n/a 'args': (2, 'placeholders'),
3240n/a }
3241n/a self.variants = {
3242n/a }
3243n/a
3244n/a def get_record(self, name=None):
3245n/a result = dict(self.common)
3246n/a if name is not None:
3247n/a result.update(self.variants[name])
3248n/a return logging.makeLogRecord(result)
3249n/a
3250n/a def test_percent(self):
3251n/a # Test %-formatting
3252n/a r = self.get_record()
3253n/a f = logging.Formatter('${%(message)s}')
3254n/a self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3255n/a f = logging.Formatter('%(random)s')
3256n/a self.assertRaises(KeyError, f.format, r)
3257n/a self.assertFalse(f.usesTime())
3258n/a f = logging.Formatter('%(asctime)s')
3259n/a self.assertTrue(f.usesTime())
3260n/a f = logging.Formatter('%(asctime)-15s')
3261n/a self.assertTrue(f.usesTime())
3262n/a f = logging.Formatter('asctime')
3263n/a self.assertFalse(f.usesTime())
3264n/a
3265n/a def test_braces(self):
3266n/a # Test {}-formatting
3267n/a r = self.get_record()
3268n/a f = logging.Formatter('$%{message}%$', style='{')
3269n/a self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3270n/a f = logging.Formatter('{random}', style='{')
3271n/a self.assertRaises(KeyError, f.format, r)
3272n/a self.assertFalse(f.usesTime())
3273n/a f = logging.Formatter('{asctime}', style='{')
3274n/a self.assertTrue(f.usesTime())
3275n/a f = logging.Formatter('{asctime!s:15}', style='{')
3276n/a self.assertTrue(f.usesTime())
3277n/a f = logging.Formatter('{asctime:15}', style='{')
3278n/a self.assertTrue(f.usesTime())
3279n/a f = logging.Formatter('asctime', style='{')
3280n/a self.assertFalse(f.usesTime())
3281n/a
3282n/a def test_dollars(self):
3283n/a # Test $-formatting
3284n/a r = self.get_record()
3285n/a f = logging.Formatter('$message', style='$')
3286n/a self.assertEqual(f.format(r), 'Message with 2 placeholders')
3287n/a f = logging.Formatter('$$%${message}%$$', style='$')
3288n/a self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3289n/a f = logging.Formatter('${random}', style='$')
3290n/a self.assertRaises(KeyError, f.format, r)
3291n/a self.assertFalse(f.usesTime())
3292n/a f = logging.Formatter('${asctime}', style='$')
3293n/a self.assertTrue(f.usesTime())
3294n/a f = logging.Formatter('${asctime', style='$')
3295n/a self.assertFalse(f.usesTime())
3296n/a f = logging.Formatter('$asctime', style='$')
3297n/a self.assertTrue(f.usesTime())
3298n/a f = logging.Formatter('asctime', style='$')
3299n/a self.assertFalse(f.usesTime())
3300n/a
3301n/a def test_invalid_style(self):
3302n/a self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
3303n/a
3304n/a def test_time(self):
3305n/a r = self.get_record()
3306n/a dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
3307n/a # We use None to indicate we want the local timezone
3308n/a # We're essentially converting a UTC time to local time
3309n/a r.created = time.mktime(dt.astimezone(None).timetuple())
3310n/a r.msecs = 123
3311n/a f = logging.Formatter('%(asctime)s %(message)s')
3312n/a f.converter = time.gmtime
3313n/a self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
3314n/a self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
3315n/a f.format(r)
3316n/a self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
3317n/a
3318n/aclass TestBufferingFormatter(logging.BufferingFormatter):
3319n/a def formatHeader(self, records):
3320n/a return '[(%d)' % len(records)
3321n/a
3322n/a def formatFooter(self, records):
3323n/a return '(%d)]' % len(records)
3324n/a
3325n/aclass BufferingFormatterTest(unittest.TestCase):
3326n/a def setUp(self):
3327n/a self.records = [
3328n/a logging.makeLogRecord({'msg': 'one'}),
3329n/a logging.makeLogRecord({'msg': 'two'}),
3330n/a ]
3331n/a
3332n/a def test_default(self):
3333n/a f = logging.BufferingFormatter()
3334n/a self.assertEqual('', f.format([]))
3335n/a self.assertEqual('onetwo', f.format(self.records))
3336n/a
3337n/a def test_custom(self):
3338n/a f = TestBufferingFormatter()
3339n/a self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
3340n/a lf = logging.Formatter('<%(message)s>')
3341n/a f = TestBufferingFormatter(lf)
3342n/a self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
3343n/a
3344n/aclass ExceptionTest(BaseTest):
3345n/a def test_formatting(self):
3346n/a r = self.root_logger
3347n/a h = RecordingHandler()
3348n/a r.addHandler(h)
3349n/a try:
3350n/a raise RuntimeError('deliberate mistake')
3351n/a except:
3352n/a logging.exception('failed', stack_info=True)
3353n/a r.removeHandler(h)
3354n/a h.close()
3355n/a r = h.records[0]
3356n/a self.assertTrue(r.exc_text.startswith('Traceback (most recent '
3357n/a 'call last):\n'))
3358n/a self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
3359n/a 'deliberate mistake'))
3360n/a self.assertTrue(r.stack_info.startswith('Stack (most recent '
3361n/a 'call last):\n'))
3362n/a self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
3363n/a 'stack_info=True)'))
3364n/a
3365n/a
3366n/aclass LastResortTest(BaseTest):
3367n/a def test_last_resort(self):
3368n/a # Test the last resort handler
3369n/a root = self.root_logger
3370n/a root.removeHandler(self.root_hdlr)
3371n/a old_lastresort = logging.lastResort
3372n/a old_raise_exceptions = logging.raiseExceptions
3373n/a
3374n/a try:
3375n/a with support.captured_stderr() as stderr:
3376n/a root.debug('This should not appear')
3377n/a self.assertEqual(stderr.getvalue(), '')
3378n/a root.warning('Final chance!')
3379n/a self.assertEqual(stderr.getvalue(), 'Final chance!\n')
3380n/a
3381n/a # No handlers and no last resort, so 'No handlers' message
3382n/a logging.lastResort = None
3383n/a with support.captured_stderr() as stderr:
3384n/a root.warning('Final chance!')
3385n/a msg = 'No handlers could be found for logger "root"\n'
3386n/a self.assertEqual(stderr.getvalue(), msg)
3387n/a
3388n/a # 'No handlers' message only printed once
3389n/a with support.captured_stderr() as stderr:
3390n/a root.warning('Final chance!')
3391n/a self.assertEqual(stderr.getvalue(), '')
3392n/a
3393n/a # If raiseExceptions is False, no message is printed
3394n/a root.manager.emittedNoHandlerWarning = False
3395n/a logging.raiseExceptions = False
3396n/a with support.captured_stderr() as stderr:
3397n/a root.warning('Final chance!')
3398n/a self.assertEqual(stderr.getvalue(), '')
3399n/a finally:
3400n/a root.addHandler(self.root_hdlr)
3401n/a logging.lastResort = old_lastresort
3402n/a logging.raiseExceptions = old_raise_exceptions
3403n/a
3404n/a
3405n/aclass FakeHandler:
3406n/a
3407n/a def __init__(self, identifier, called):
3408n/a for method in ('acquire', 'flush', 'close', 'release'):
3409n/a setattr(self, method, self.record_call(identifier, method, called))
3410n/a
3411n/a def record_call(self, identifier, method_name, called):
3412n/a def inner():
3413n/a called.append('{} - {}'.format(identifier, method_name))
3414n/a return inner
3415n/a
3416n/a
3417n/aclass RecordingHandler(logging.NullHandler):
3418n/a
3419n/a def __init__(self, *args, **kwargs):
3420n/a super(RecordingHandler, self).__init__(*args, **kwargs)
3421n/a self.records = []
3422n/a
3423n/a def handle(self, record):
3424n/a """Keep track of all the emitted records."""
3425n/a self.records.append(record)
3426n/a
3427n/a
3428n/aclass ShutdownTest(BaseTest):
3429n/a
3430n/a """Test suite for the shutdown method."""
3431n/a
3432n/a def setUp(self):
3433n/a super(ShutdownTest, self).setUp()
3434n/a self.called = []
3435n/a
3436n/a raise_exceptions = logging.raiseExceptions
3437n/a self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
3438n/a
3439n/a def raise_error(self, error):
3440n/a def inner():
3441n/a raise error()
3442n/a return inner
3443n/a
3444n/a def test_no_failure(self):
3445n/a # create some fake handlers
3446n/a handler0 = FakeHandler(0, self.called)
3447n/a handler1 = FakeHandler(1, self.called)
3448n/a handler2 = FakeHandler(2, self.called)
3449n/a
3450n/a # create live weakref to those handlers
3451n/a handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
3452n/a
3453n/a logging.shutdown(handlerList=list(handlers))
3454n/a
3455n/a expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
3456n/a '1 - acquire', '1 - flush', '1 - close', '1 - release',
3457n/a '0 - acquire', '0 - flush', '0 - close', '0 - release']
3458n/a self.assertEqual(expected, self.called)
3459n/a
3460n/a def _test_with_failure_in_method(self, method, error):
3461n/a handler = FakeHandler(0, self.called)
3462n/a setattr(handler, method, self.raise_error(error))
3463n/a handlers = [logging.weakref.ref(handler)]
3464n/a
3465n/a logging.shutdown(handlerList=list(handlers))
3466n/a
3467n/a self.assertEqual('0 - release', self.called[-1])
3468n/a
3469n/a def test_with_ioerror_in_acquire(self):
3470n/a self._test_with_failure_in_method('acquire', OSError)
3471n/a
3472n/a def test_with_ioerror_in_flush(self):
3473n/a self._test_with_failure_in_method('flush', OSError)
3474n/a
3475n/a def test_with_ioerror_in_close(self):
3476n/a self._test_with_failure_in_method('close', OSError)
3477n/a
3478n/a def test_with_valueerror_in_acquire(self):
3479n/a self._test_with_failure_in_method('acquire', ValueError)
3480n/a
3481n/a def test_with_valueerror_in_flush(self):
3482n/a self._test_with_failure_in_method('flush', ValueError)
3483n/a
3484n/a def test_with_valueerror_in_close(self):
3485n/a self._test_with_failure_in_method('close', ValueError)
3486n/a
3487n/a def test_with_other_error_in_acquire_without_raise(self):
3488n/a logging.raiseExceptions = False
3489n/a self._test_with_failure_in_method('acquire', IndexError)
3490n/a
3491n/a def test_with_other_error_in_flush_without_raise(self):
3492n/a logging.raiseExceptions = False
3493n/a self._test_with_failure_in_method('flush', IndexError)
3494n/a
3495n/a def test_with_other_error_in_close_without_raise(self):
3496n/a logging.raiseExceptions = False
3497n/a self._test_with_failure_in_method('close', IndexError)
3498n/a
3499n/a def test_with_other_error_in_acquire_with_raise(self):
3500n/a logging.raiseExceptions = True
3501n/a self.assertRaises(IndexError, self._test_with_failure_in_method,
3502n/a 'acquire', IndexError)
3503n/a
3504n/a def test_with_other_error_in_flush_with_raise(self):
3505n/a logging.raiseExceptions = True
3506n/a self.assertRaises(IndexError, self._test_with_failure_in_method,
3507n/a 'flush', IndexError)
3508n/a
3509n/a def test_with_other_error_in_close_with_raise(self):
3510n/a logging.raiseExceptions = True
3511n/a self.assertRaises(IndexError, self._test_with_failure_in_method,
3512n/a 'close', IndexError)
3513n/a
3514n/a
3515n/aclass ModuleLevelMiscTest(BaseTest):
3516n/a
3517n/a """Test suite for some module level methods."""
3518n/a
3519n/a def test_disable(self):
3520n/a old_disable = logging.root.manager.disable
3521n/a # confirm our assumptions are correct
3522n/a self.assertEqual(old_disable, 0)
3523n/a self.addCleanup(logging.disable, old_disable)
3524n/a
3525n/a logging.disable(83)
3526n/a self.assertEqual(logging.root.manager.disable, 83)
3527n/a
3528n/a # test the default value introduced in 3.7
3529n/a # (Issue #28524)
3530n/a logging.disable()
3531n/a self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
3532n/a
3533n/a def _test_log(self, method, level=None):
3534n/a called = []
3535n/a support.patch(self, logging, 'basicConfig',
3536n/a lambda *a, **kw: called.append((a, kw)))
3537n/a
3538n/a recording = RecordingHandler()
3539n/a logging.root.addHandler(recording)
3540n/a
3541n/a log_method = getattr(logging, method)
3542n/a if level is not None:
3543n/a log_method(level, "test me: %r", recording)
3544n/a else:
3545n/a log_method("test me: %r", recording)
3546n/a
3547n/a self.assertEqual(len(recording.records), 1)
3548n/a record = recording.records[0]
3549n/a self.assertEqual(record.getMessage(), "test me: %r" % recording)
3550n/a
3551n/a expected_level = level if level is not None else getattr(logging, method.upper())
3552n/a self.assertEqual(record.levelno, expected_level)
3553n/a
3554n/a # basicConfig was not called!
3555n/a self.assertEqual(called, [])
3556n/a
3557n/a def test_log(self):
3558n/a self._test_log('log', logging.ERROR)
3559n/a
3560n/a def test_debug(self):
3561n/a self._test_log('debug')
3562n/a
3563n/a def test_info(self):
3564n/a self._test_log('info')
3565n/a
3566n/a def test_warning(self):
3567n/a self._test_log('warning')
3568n/a
3569n/a def test_error(self):
3570n/a self._test_log('error')
3571n/a
3572n/a def test_critical(self):
3573n/a self._test_log('critical')
3574n/a
3575n/a def test_set_logger_class(self):
3576n/a self.assertRaises(TypeError, logging.setLoggerClass, object)
3577n/a
3578n/a class MyLogger(logging.Logger):
3579n/a pass
3580n/a
3581n/a logging.setLoggerClass(MyLogger)
3582n/a self.assertEqual(logging.getLoggerClass(), MyLogger)
3583n/a
3584n/a logging.setLoggerClass(logging.Logger)
3585n/a self.assertEqual(logging.getLoggerClass(), logging.Logger)
3586n/a
3587n/a @support.requires_type_collecting
3588n/a def test_logging_at_shutdown(self):
3589n/a # Issue #20037
3590n/a code = """if 1:
3591n/a import logging
3592n/a
3593n/a class A:
3594n/a def __del__(self):
3595n/a try:
3596n/a raise ValueError("some error")
3597n/a except Exception:
3598n/a logging.exception("exception in __del__")
3599n/a
3600n/a a = A()"""
3601n/a rc, out, err = assert_python_ok("-c", code)
3602n/a err = err.decode()
3603n/a self.assertIn("exception in __del__", err)
3604n/a self.assertIn("ValueError: some error", err)
3605n/a
3606n/a
3607n/aclass LogRecordTest(BaseTest):
3608n/a def test_str_rep(self):
3609n/a r = logging.makeLogRecord({})
3610n/a s = str(r)
3611n/a self.assertTrue(s.startswith('<LogRecord: '))
3612n/a self.assertTrue(s.endswith('>'))
3613n/a
3614n/a def test_dict_arg(self):
3615n/a h = RecordingHandler()
3616n/a r = logging.getLogger()
3617n/a r.addHandler(h)
3618n/a d = {'less' : 'more' }
3619n/a logging.warning('less is %(less)s', d)
3620n/a self.assertIs(h.records[0].args, d)
3621n/a self.assertEqual(h.records[0].message, 'less is more')
3622n/a r.removeHandler(h)
3623n/a h.close()
3624n/a
3625n/a def test_multiprocessing(self):
3626n/a r = logging.makeLogRecord({})
3627n/a self.assertEqual(r.processName, 'MainProcess')
3628n/a try:
3629n/a import multiprocessing as mp
3630n/a r = logging.makeLogRecord({})
3631n/a self.assertEqual(r.processName, mp.current_process().name)
3632n/a except ImportError:
3633n/a pass
3634n/a
3635n/a def test_optional(self):
3636n/a r = logging.makeLogRecord({})
3637n/a NOT_NONE = self.assertIsNotNone
3638n/a if threading:
3639n/a NOT_NONE(r.thread)
3640n/a NOT_NONE(r.threadName)
3641n/a NOT_NONE(r.process)
3642n/a NOT_NONE(r.processName)
3643n/a log_threads = logging.logThreads
3644n/a log_processes = logging.logProcesses
3645n/a log_multiprocessing = logging.logMultiprocessing
3646n/a try:
3647n/a logging.logThreads = False
3648n/a logging.logProcesses = False
3649n/a logging.logMultiprocessing = False
3650n/a r = logging.makeLogRecord({})
3651n/a NONE = self.assertIsNone
3652n/a NONE(r.thread)
3653n/a NONE(r.threadName)
3654n/a NONE(r.process)
3655n/a NONE(r.processName)
3656n/a finally:
3657n/a logging.logThreads = log_threads
3658n/a logging.logProcesses = log_processes
3659n/a logging.logMultiprocessing = log_multiprocessing
3660n/a
3661n/aclass BasicConfigTest(unittest.TestCase):
3662n/a
3663n/a """Test suite for logging.basicConfig."""
3664n/a
3665n/a def setUp(self):
3666n/a super(BasicConfigTest, self).setUp()
3667n/a self.handlers = logging.root.handlers
3668n/a self.saved_handlers = logging._handlers.copy()
3669n/a self.saved_handler_list = logging._handlerList[:]
3670n/a self.original_logging_level = logging.root.level
3671n/a self.addCleanup(self.cleanup)
3672n/a logging.root.handlers = []
3673n/a
3674n/a def tearDown(self):
3675n/a for h in logging.root.handlers[:]:
3676n/a logging.root.removeHandler(h)
3677n/a h.close()
3678n/a super(BasicConfigTest, self).tearDown()
3679n/a
3680n/a def cleanup(self):
3681n/a setattr(logging.root, 'handlers', self.handlers)
3682n/a logging._handlers.clear()
3683n/a logging._handlers.update(self.saved_handlers)
3684n/a logging._handlerList[:] = self.saved_handler_list
3685n/a logging.root.level = self.original_logging_level
3686n/a
3687n/a def test_no_kwargs(self):
3688n/a logging.basicConfig()
3689n/a
3690n/a # handler defaults to a StreamHandler to sys.stderr
3691n/a self.assertEqual(len(logging.root.handlers), 1)
3692n/a handler = logging.root.handlers[0]
3693n/a self.assertIsInstance(handler, logging.StreamHandler)
3694n/a self.assertEqual(handler.stream, sys.stderr)
3695n/a
3696n/a formatter = handler.formatter
3697n/a # format defaults to logging.BASIC_FORMAT
3698n/a self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
3699n/a # datefmt defaults to None
3700n/a self.assertIsNone(formatter.datefmt)
3701n/a # style defaults to %
3702n/a self.assertIsInstance(formatter._style, logging.PercentStyle)
3703n/a
3704n/a # level is not explicitly set
3705n/a self.assertEqual(logging.root.level, self.original_logging_level)
3706n/a
3707n/a def test_strformatstyle(self):
3708n/a with support.captured_stdout() as output:
3709n/a logging.basicConfig(stream=sys.stdout, style="{")
3710n/a logging.error("Log an error")
3711n/a sys.stdout.seek(0)
3712n/a self.assertEqual(output.getvalue().strip(),
3713n/a "ERROR:root:Log an error")
3714n/a
3715n/a def test_stringtemplatestyle(self):
3716n/a with support.captured_stdout() as output:
3717n/a logging.basicConfig(stream=sys.stdout, style="$")
3718n/a logging.error("Log an error")
3719n/a sys.stdout.seek(0)
3720n/a self.assertEqual(output.getvalue().strip(),
3721n/a "ERROR:root:Log an error")
3722n/a
3723n/a def test_filename(self):
3724n/a
3725n/a def cleanup(h1, h2, fn):
3726n/a h1.close()
3727n/a h2.close()
3728n/a os.remove(fn)
3729n/a
3730n/a logging.basicConfig(filename='test.log')
3731n/a
3732n/a self.assertEqual(len(logging.root.handlers), 1)
3733n/a handler = logging.root.handlers[0]
3734n/a self.assertIsInstance(handler, logging.FileHandler)
3735n/a
3736n/a expected = logging.FileHandler('test.log', 'a')
3737n/a self.assertEqual(handler.stream.mode, expected.stream.mode)
3738n/a self.assertEqual(handler.stream.name, expected.stream.name)
3739n/a self.addCleanup(cleanup, handler, expected, 'test.log')
3740n/a
3741n/a def test_filemode(self):
3742n/a
3743n/a def cleanup(h1, h2, fn):
3744n/a h1.close()
3745n/a h2.close()
3746n/a os.remove(fn)
3747n/a
3748n/a logging.basicConfig(filename='test.log', filemode='wb')
3749n/a
3750n/a handler = logging.root.handlers[0]
3751n/a expected = logging.FileHandler('test.log', 'wb')
3752n/a self.assertEqual(handler.stream.mode, expected.stream.mode)
3753n/a self.addCleanup(cleanup, handler, expected, 'test.log')
3754n/a
3755n/a def test_stream(self):
3756n/a stream = io.StringIO()
3757n/a self.addCleanup(stream.close)
3758n/a logging.basicConfig(stream=stream)
3759n/a
3760n/a self.assertEqual(len(logging.root.handlers), 1)
3761n/a handler = logging.root.handlers[0]
3762n/a self.assertIsInstance(handler, logging.StreamHandler)
3763n/a self.assertEqual(handler.stream, stream)
3764n/a
3765n/a def test_format(self):
3766n/a logging.basicConfig(format='foo')
3767n/a
3768n/a formatter = logging.root.handlers[0].formatter
3769n/a self.assertEqual(formatter._style._fmt, 'foo')
3770n/a
3771n/a def test_datefmt(self):
3772n/a logging.basicConfig(datefmt='bar')
3773n/a
3774n/a formatter = logging.root.handlers[0].formatter
3775n/a self.assertEqual(formatter.datefmt, 'bar')
3776n/a
3777n/a def test_style(self):
3778n/a logging.basicConfig(style='$')
3779n/a
3780n/a formatter = logging.root.handlers[0].formatter
3781n/a self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
3782n/a
3783n/a def test_level(self):
3784n/a old_level = logging.root.level
3785n/a self.addCleanup(logging.root.setLevel, old_level)
3786n/a
3787n/a logging.basicConfig(level=57)
3788n/a self.assertEqual(logging.root.level, 57)
3789n/a # Test that second call has no effect
3790n/a logging.basicConfig(level=58)
3791n/a self.assertEqual(logging.root.level, 57)
3792n/a
3793n/a def test_incompatible(self):
3794n/a assertRaises = self.assertRaises
3795n/a handlers = [logging.StreamHandler()]
3796n/a stream = sys.stderr
3797n/a assertRaises(ValueError, logging.basicConfig, filename='test.log',
3798n/a stream=stream)
3799n/a assertRaises(ValueError, logging.basicConfig, filename='test.log',
3800n/a handlers=handlers)
3801n/a assertRaises(ValueError, logging.basicConfig, stream=stream,
3802n/a handlers=handlers)
3803n/a # Issue 23207: test for invalid kwargs
3804n/a assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
3805n/a # Should pop both filename and filemode even if filename is None
3806n/a logging.basicConfig(filename=None, filemode='a')
3807n/a
3808n/a def test_handlers(self):
3809n/a handlers = [
3810n/a logging.StreamHandler(),
3811n/a logging.StreamHandler(sys.stdout),
3812n/a logging.StreamHandler(),
3813n/a ]
3814n/a f = logging.Formatter()
3815n/a handlers[2].setFormatter(f)
3816n/a logging.basicConfig(handlers=handlers)
3817n/a self.assertIs(handlers[0], logging.root.handlers[0])
3818n/a self.assertIs(handlers[1], logging.root.handlers[1])
3819n/a self.assertIs(handlers[2], logging.root.handlers[2])
3820n/a self.assertIsNotNone(handlers[0].formatter)
3821n/a self.assertIsNotNone(handlers[1].formatter)
3822n/a self.assertIs(handlers[2].formatter, f)
3823n/a self.assertIs(handlers[0].formatter, handlers[1].formatter)
3824n/a
3825n/a def _test_log(self, method, level=None):
3826n/a # logging.root has no handlers so basicConfig should be called
3827n/a called = []
3828n/a
3829n/a old_basic_config = logging.basicConfig
3830n/a def my_basic_config(*a, **kw):
3831n/a old_basic_config()
3832n/a old_level = logging.root.level
3833n/a logging.root.setLevel(100) # avoid having messages in stderr
3834n/a self.addCleanup(logging.root.setLevel, old_level)
3835n/a called.append((a, kw))
3836n/a
3837n/a support.patch(self, logging, 'basicConfig', my_basic_config)
3838n/a
3839n/a log_method = getattr(logging, method)
3840n/a if level is not None:
3841n/a log_method(level, "test me")
3842n/a else:
3843n/a log_method("test me")
3844n/a
3845n/a # basicConfig was called with no arguments
3846n/a self.assertEqual(called, [((), {})])
3847n/a
3848n/a def test_log(self):
3849n/a self._test_log('log', logging.WARNING)
3850n/a
3851n/a def test_debug(self):
3852n/a self._test_log('debug')
3853n/a
3854n/a def test_info(self):
3855n/a self._test_log('info')
3856n/a
3857n/a def test_warning(self):
3858n/a self._test_log('warning')
3859n/a
3860n/a def test_error(self):
3861n/a self._test_log('error')
3862n/a
3863n/a def test_critical(self):
3864n/a self._test_log('critical')
3865n/a
3866n/a
3867n/aclass LoggerAdapterTest(unittest.TestCase):
3868n/a
3869n/a def setUp(self):
3870n/a super(LoggerAdapterTest, self).setUp()
3871n/a old_handler_list = logging._handlerList[:]
3872n/a
3873n/a self.recording = RecordingHandler()
3874n/a self.logger = logging.root
3875n/a self.logger.addHandler(self.recording)
3876n/a self.addCleanup(self.logger.removeHandler, self.recording)
3877n/a self.addCleanup(self.recording.close)
3878n/a
3879n/a def cleanup():
3880n/a logging._handlerList[:] = old_handler_list
3881n/a
3882n/a self.addCleanup(cleanup)
3883n/a self.addCleanup(logging.shutdown)
3884n/a self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
3885n/a
3886n/a def test_exception(self):
3887n/a msg = 'testing exception: %r'
3888n/a exc = None
3889n/a try:
3890n/a 1 / 0
3891n/a except ZeroDivisionError as e:
3892n/a exc = e
3893n/a self.adapter.exception(msg, self.recording)
3894n/a
3895n/a self.assertEqual(len(self.recording.records), 1)
3896n/a record = self.recording.records[0]
3897n/a self.assertEqual(record.levelno, logging.ERROR)
3898n/a self.assertEqual(record.msg, msg)
3899n/a self.assertEqual(record.args, (self.recording,))
3900n/a self.assertEqual(record.exc_info,
3901n/a (exc.__class__, exc, exc.__traceback__))
3902n/a
3903n/a def test_exception_excinfo(self):
3904n/a try:
3905n/a 1 / 0
3906n/a except ZeroDivisionError as e:
3907n/a exc = e
3908n/a
3909n/a self.adapter.exception('exc_info test', exc_info=exc)
3910n/a
3911n/a self.assertEqual(len(self.recording.records), 1)
3912n/a record = self.recording.records[0]
3913n/a self.assertEqual(record.exc_info,
3914n/a (exc.__class__, exc, exc.__traceback__))
3915n/a
3916n/a def test_critical(self):
3917n/a msg = 'critical test! %r'
3918n/a self.adapter.critical(msg, self.recording)
3919n/a
3920n/a self.assertEqual(len(self.recording.records), 1)
3921n/a record = self.recording.records[0]
3922n/a self.assertEqual(record.levelno, logging.CRITICAL)
3923n/a self.assertEqual(record.msg, msg)
3924n/a self.assertEqual(record.args, (self.recording,))
3925n/a
3926n/a def test_is_enabled_for(self):
3927n/a old_disable = self.adapter.logger.manager.disable
3928n/a self.adapter.logger.manager.disable = 33
3929n/a self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
3930n/a old_disable)
3931n/a self.assertFalse(self.adapter.isEnabledFor(32))
3932n/a
3933n/a def test_has_handlers(self):
3934n/a self.assertTrue(self.adapter.hasHandlers())
3935n/a
3936n/a for handler in self.logger.handlers:
3937n/a self.logger.removeHandler(handler)
3938n/a
3939n/a self.assertFalse(self.logger.hasHandlers())
3940n/a self.assertFalse(self.adapter.hasHandlers())
3941n/a
3942n/a
3943n/aclass LoggerTest(BaseTest):
3944n/a
3945n/a def setUp(self):
3946n/a super(LoggerTest, self).setUp()
3947n/a self.recording = RecordingHandler()
3948n/a self.logger = logging.Logger(name='blah')
3949n/a self.logger.addHandler(self.recording)
3950n/a self.addCleanup(self.logger.removeHandler, self.recording)
3951n/a self.addCleanup(self.recording.close)
3952n/a self.addCleanup(logging.shutdown)
3953n/a
3954n/a def test_set_invalid_level(self):
3955n/a self.assertRaises(TypeError, self.logger.setLevel, object())
3956n/a
3957n/a def test_exception(self):
3958n/a msg = 'testing exception: %r'
3959n/a exc = None
3960n/a try:
3961n/a 1 / 0
3962n/a except ZeroDivisionError as e:
3963n/a exc = e
3964n/a self.logger.exception(msg, self.recording)
3965n/a
3966n/a self.assertEqual(len(self.recording.records), 1)
3967n/a record = self.recording.records[0]
3968n/a self.assertEqual(record.levelno, logging.ERROR)
3969n/a self.assertEqual(record.msg, msg)
3970n/a self.assertEqual(record.args, (self.recording,))
3971n/a self.assertEqual(record.exc_info,
3972n/a (exc.__class__, exc, exc.__traceback__))
3973n/a
3974n/a def test_log_invalid_level_with_raise(self):
3975n/a with support.swap_attr(logging, 'raiseExceptions', True):
3976n/a self.assertRaises(TypeError, self.logger.log, '10', 'test message')
3977n/a
3978n/a def test_log_invalid_level_no_raise(self):
3979n/a with support.swap_attr(logging, 'raiseExceptions', False):
3980n/a self.logger.log('10', 'test message') # no exception happens
3981n/a
3982n/a def test_find_caller_with_stack_info(self):
3983n/a called = []
3984n/a support.patch(self, logging.traceback, 'print_stack',
3985n/a lambda f, file: called.append(file.getvalue()))
3986n/a
3987n/a self.logger.findCaller(stack_info=True)
3988n/a
3989n/a self.assertEqual(len(called), 1)
3990n/a self.assertEqual('Stack (most recent call last):\n', called[0])
3991n/a
3992n/a def test_make_record_with_extra_overwrite(self):
3993n/a name = 'my record'
3994n/a level = 13
3995n/a fn = lno = msg = args = exc_info = func = sinfo = None
3996n/a rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
3997n/a exc_info, func, sinfo)
3998n/a
3999n/a for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
4000n/a extra = {key: 'some value'}
4001n/a self.assertRaises(KeyError, self.logger.makeRecord, name, level,
4002n/a fn, lno, msg, args, exc_info,
4003n/a extra=extra, sinfo=sinfo)
4004n/a
4005n/a def test_make_record_with_extra_no_overwrite(self):
4006n/a name = 'my record'
4007n/a level = 13
4008n/a fn = lno = msg = args = exc_info = func = sinfo = None
4009n/a extra = {'valid_key': 'some value'}
4010n/a result = self.logger.makeRecord(name, level, fn, lno, msg, args,
4011n/a exc_info, extra=extra, sinfo=sinfo)
4012n/a self.assertIn('valid_key', result.__dict__)
4013n/a
4014n/a def test_has_handlers(self):
4015n/a self.assertTrue(self.logger.hasHandlers())
4016n/a
4017n/a for handler in self.logger.handlers:
4018n/a self.logger.removeHandler(handler)
4019n/a self.assertFalse(self.logger.hasHandlers())
4020n/a
4021n/a def test_has_handlers_no_propagate(self):
4022n/a child_logger = logging.getLogger('blah.child')
4023n/a child_logger.propagate = False
4024n/a self.assertFalse(child_logger.hasHandlers())
4025n/a
4026n/a def test_is_enabled_for(self):
4027n/a old_disable = self.logger.manager.disable
4028n/a self.logger.manager.disable = 23
4029n/a self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
4030n/a self.assertFalse(self.logger.isEnabledFor(22))
4031n/a
4032n/a def test_root_logger_aliases(self):
4033n/a root = logging.getLogger()
4034n/a self.assertIs(root, logging.root)
4035n/a self.assertIs(root, logging.getLogger(None))
4036n/a self.assertIs(root, logging.getLogger(''))
4037n/a self.assertIs(root, logging.getLogger('foo').root)
4038n/a self.assertIs(root, logging.getLogger('foo.bar').root)
4039n/a self.assertIs(root, logging.getLogger('foo').parent)
4040n/a
4041n/a self.assertIsNot(root, logging.getLogger('\0'))
4042n/a self.assertIsNot(root, logging.getLogger('foo.bar').parent)
4043n/a
4044n/a def test_invalid_names(self):
4045n/a self.assertRaises(TypeError, logging.getLogger, any)
4046n/a self.assertRaises(TypeError, logging.getLogger, b'foo')
4047n/a
4048n/a
4049n/aclass BaseFileTest(BaseTest):
4050n/a "Base class for handler tests that write log files"
4051n/a
4052n/a def setUp(self):
4053n/a BaseTest.setUp(self)
4054n/a fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
4055n/a os.close(fd)
4056n/a self.rmfiles = []
4057n/a
4058n/a def tearDown(self):
4059n/a for fn in self.rmfiles:
4060n/a os.unlink(fn)
4061n/a if os.path.exists(self.fn):
4062n/a os.unlink(self.fn)
4063n/a BaseTest.tearDown(self)
4064n/a
4065n/a def assertLogFile(self, filename):
4066n/a "Assert a log file is there and register it for deletion"
4067n/a self.assertTrue(os.path.exists(filename),
4068n/a msg="Log file %r does not exist" % filename)
4069n/a self.rmfiles.append(filename)
4070n/a
4071n/a
4072n/aclass FileHandlerTest(BaseFileTest):
4073n/a def test_delay(self):
4074n/a os.unlink(self.fn)
4075n/a fh = logging.FileHandler(self.fn, delay=True)
4076n/a self.assertIsNone(fh.stream)
4077n/a self.assertFalse(os.path.exists(self.fn))
4078n/a fh.handle(logging.makeLogRecord({}))
4079n/a self.assertIsNotNone(fh.stream)
4080n/a self.assertTrue(os.path.exists(self.fn))
4081n/a fh.close()
4082n/a
4083n/aclass RotatingFileHandlerTest(BaseFileTest):
4084n/a def next_rec(self):
4085n/a return logging.LogRecord('n', logging.DEBUG, 'p', 1,
4086n/a self.next_message(), None, None, None)
4087n/a
4088n/a def test_should_not_rollover(self):
4089n/a # If maxbytes is zero rollover never occurs
4090n/a rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
4091n/a self.assertFalse(rh.shouldRollover(None))
4092n/a rh.close()
4093n/a
4094n/a def test_should_rollover(self):
4095n/a rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
4096n/a self.assertTrue(rh.shouldRollover(self.next_rec()))
4097n/a rh.close()
4098n/a
4099n/a def test_file_created(self):
4100n/a # checks that the file is created and assumes it was created
4101n/a # by us
4102n/a rh = logging.handlers.RotatingFileHandler(self.fn)
4103n/a rh.emit(self.next_rec())
4104n/a self.assertLogFile(self.fn)
4105n/a rh.close()
4106n/a
4107n/a def test_rollover_filenames(self):
4108n/a def namer(name):
4109n/a return name + ".test"
4110n/a rh = logging.handlers.RotatingFileHandler(
4111n/a self.fn, backupCount=2, maxBytes=1)
4112n/a rh.namer = namer
4113n/a rh.emit(self.next_rec())
4114n/a self.assertLogFile(self.fn)
4115n/a rh.emit(self.next_rec())
4116n/a self.assertLogFile(namer(self.fn + ".1"))
4117n/a rh.emit(self.next_rec())
4118n/a self.assertLogFile(namer(self.fn + ".2"))
4119n/a self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4120n/a rh.close()
4121n/a
4122n/a @support.requires_zlib
4123n/a def test_rotator(self):
4124n/a def namer(name):
4125n/a return name + ".gz"
4126n/a
4127n/a def rotator(source, dest):
4128n/a with open(source, "rb") as sf:
4129n/a data = sf.read()
4130n/a compressed = zlib.compress(data, 9)
4131n/a with open(dest, "wb") as df:
4132n/a df.write(compressed)
4133n/a os.remove(source)
4134n/a
4135n/a rh = logging.handlers.RotatingFileHandler(
4136n/a self.fn, backupCount=2, maxBytes=1)
4137n/a rh.rotator = rotator
4138n/a rh.namer = namer
4139n/a m1 = self.next_rec()
4140n/a rh.emit(m1)
4141n/a self.assertLogFile(self.fn)
4142n/a m2 = self.next_rec()
4143n/a rh.emit(m2)
4144n/a fn = namer(self.fn + ".1")
4145n/a self.assertLogFile(fn)
4146n/a newline = os.linesep
4147n/a with open(fn, "rb") as f:
4148n/a compressed = f.read()
4149n/a data = zlib.decompress(compressed)
4150n/a self.assertEqual(data.decode("ascii"), m1.msg + newline)
4151n/a rh.emit(self.next_rec())
4152n/a fn = namer(self.fn + ".2")
4153n/a self.assertLogFile(fn)
4154n/a with open(fn, "rb") as f:
4155n/a compressed = f.read()
4156n/a data = zlib.decompress(compressed)
4157n/a self.assertEqual(data.decode("ascii"), m1.msg + newline)
4158n/a rh.emit(self.next_rec())
4159n/a fn = namer(self.fn + ".2")
4160n/a with open(fn, "rb") as f:
4161n/a compressed = f.read()
4162n/a data = zlib.decompress(compressed)
4163n/a self.assertEqual(data.decode("ascii"), m2.msg + newline)
4164n/a self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4165n/a rh.close()
4166n/a
4167n/aclass TimedRotatingFileHandlerTest(BaseFileTest):
4168n/a # other test methods added below
4169n/a def test_rollover(self):
4170n/a fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
4171n/a backupCount=1)
4172n/a fmt = logging.Formatter('%(asctime)s %(message)s')
4173n/a fh.setFormatter(fmt)
4174n/a r1 = logging.makeLogRecord({'msg': 'testing - initial'})
4175n/a fh.emit(r1)
4176n/a self.assertLogFile(self.fn)
4177n/a time.sleep(1.1) # a little over a second ...
4178n/a r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
4179n/a fh.emit(r2)
4180n/a fh.close()
4181n/a # At this point, we should have a recent rotated file which we
4182n/a # can test for the existence of. However, in practice, on some
4183n/a # machines which run really slowly, we don't know how far back
4184n/a # in time to go to look for the log file. So, we go back a fair
4185n/a # bit, and stop as soon as we see a rotated file. In theory this
4186n/a # could of course still fail, but the chances are lower.
4187n/a found = False
4188n/a now = datetime.datetime.now()
4189n/a GO_BACK = 5 * 60 # seconds
4190n/a for secs in range(GO_BACK):
4191n/a prev = now - datetime.timedelta(seconds=secs)
4192n/a fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
4193n/a found = os.path.exists(fn)
4194n/a if found:
4195n/a self.rmfiles.append(fn)
4196n/a break
4197n/a msg = 'No rotated files found, went back %d seconds' % GO_BACK
4198n/a if not found:
4199n/a #print additional diagnostics
4200n/a dn, fn = os.path.split(self.fn)
4201n/a files = [f for f in os.listdir(dn) if f.startswith(fn)]
4202n/a print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
4203n/a print('The only matching files are: %s' % files, file=sys.stderr)
4204n/a for f in files:
4205n/a print('Contents of %s:' % f)
4206n/a path = os.path.join(dn, f)
4207n/a with open(path, 'r') as tf:
4208n/a print(tf.read())
4209n/a self.assertTrue(found, msg=msg)
4210n/a
4211n/a def test_invalid(self):
4212n/a assertRaises = self.assertRaises
4213n/a assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4214n/a self.fn, 'X', delay=True)
4215n/a assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4216n/a self.fn, 'W', delay=True)
4217n/a assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4218n/a self.fn, 'W7', delay=True)
4219n/a
4220n/a def test_compute_rollover_daily_attime(self):
4221n/a currentTime = 0
4222n/a atTime = datetime.time(12, 0, 0)
4223n/a rh = logging.handlers.TimedRotatingFileHandler(
4224n/a self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True,
4225n/a atTime=atTime)
4226n/a try:
4227n/a actual = rh.computeRollover(currentTime)
4228n/a self.assertEqual(actual, currentTime + 12 * 60 * 60)
4229n/a
4230n/a actual = rh.computeRollover(currentTime + 13 * 60 * 60)
4231n/a self.assertEqual(actual, currentTime + 36 * 60 * 60)
4232n/a finally:
4233n/a rh.close()
4234n/a
4235n/a #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
4236n/a def test_compute_rollover_weekly_attime(self):
4237n/a currentTime = int(time.time())
4238n/a today = currentTime - currentTime % 86400
4239n/a
4240n/a atTime = datetime.time(12, 0, 0)
4241n/a
4242n/a wday = time.gmtime(today).tm_wday
4243n/a for day in range(7):
4244n/a rh = logging.handlers.TimedRotatingFileHandler(
4245n/a self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True,
4246n/a atTime=atTime)
4247n/a try:
4248n/a if wday > day:
4249n/a # The rollover day has already passed this week, so we
4250n/a # go over into next week
4251n/a expected = (7 - wday + day)
4252n/a else:
4253n/a expected = (day - wday)
4254n/a # At this point expected is in days from now, convert to seconds
4255n/a expected *= 24 * 60 * 60
4256n/a # Add in the rollover time
4257n/a expected += 12 * 60 * 60
4258n/a # Add in adjustment for today
4259n/a expected += today
4260n/a actual = rh.computeRollover(today)
4261n/a if actual != expected:
4262n/a print('failed in timezone: %d' % time.timezone)
4263n/a print('local vars: %s' % locals())
4264n/a self.assertEqual(actual, expected)
4265n/a if day == wday:
4266n/a # goes into following week
4267n/a expected += 7 * 24 * 60 * 60
4268n/a actual = rh.computeRollover(today + 13 * 60 * 60)
4269n/a if actual != expected:
4270n/a print('failed in timezone: %d' % time.timezone)
4271n/a print('local vars: %s' % locals())
4272n/a self.assertEqual(actual, expected)
4273n/a finally:
4274n/a rh.close()
4275n/a
4276n/a
4277n/adef secs(**kw):
4278n/a return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
4279n/a
4280n/afor when, exp in (('S', 1),
4281n/a ('M', 60),
4282n/a ('H', 60 * 60),
4283n/a ('D', 60 * 60 * 24),
4284n/a ('MIDNIGHT', 60 * 60 * 24),
4285n/a # current time (epoch start) is a Thursday, W0 means Monday
4286n/a ('W0', secs(days=4, hours=24)),
4287n/a ):
4288n/a def test_compute_rollover(self, when=when, exp=exp):
4289n/a rh = logging.handlers.TimedRotatingFileHandler(
4290n/a self.fn, when=when, interval=1, backupCount=0, utc=True)
4291n/a currentTime = 0.0
4292n/a actual = rh.computeRollover(currentTime)
4293n/a if exp != actual:
4294n/a # Failures occur on some systems for MIDNIGHT and W0.
4295n/a # Print detailed calculation for MIDNIGHT so we can try to see
4296n/a # what's going on
4297n/a if when == 'MIDNIGHT':
4298n/a try:
4299n/a if rh.utc:
4300n/a t = time.gmtime(currentTime)
4301n/a else:
4302n/a t = time.localtime(currentTime)
4303n/a currentHour = t[3]
4304n/a currentMinute = t[4]
4305n/a currentSecond = t[5]
4306n/a # r is the number of seconds left between now and midnight
4307n/a r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
4308n/a currentMinute) * 60 +
4309n/a currentSecond)
4310n/a result = currentTime + r
4311n/a print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
4312n/a print('currentHour: %s' % currentHour, file=sys.stderr)
4313n/a print('currentMinute: %s' % currentMinute, file=sys.stderr)
4314n/a print('currentSecond: %s' % currentSecond, file=sys.stderr)
4315n/a print('r: %s' % r, file=sys.stderr)
4316n/a print('result: %s' % result, file=sys.stderr)
4317n/a except Exception:
4318n/a print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
4319n/a self.assertEqual(exp, actual)
4320n/a rh.close()
4321n/a setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
4322n/a
4323n/a
4324n/a@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
4325n/aclass NTEventLogHandlerTest(BaseTest):
4326n/a def test_basic(self):
4327n/a logtype = 'Application'
4328n/a elh = win32evtlog.OpenEventLog(None, logtype)
4329n/a num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
4330n/a
4331n/a try:
4332n/a h = logging.handlers.NTEventLogHandler('test_logging')
4333n/a except pywintypes.error as e:
4334n/a if e.winerror == 5: # access denied
4335n/a raise unittest.SkipTest('Insufficient privileges to run test')
4336n/a raise
4337n/a
4338n/a r = logging.makeLogRecord({'msg': 'Test Log Message'})
4339n/a h.handle(r)
4340n/a h.close()
4341n/a # Now see if the event is recorded
4342n/a self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
4343n/a flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
4344n/a win32evtlog.EVENTLOG_SEQUENTIAL_READ
4345n/a found = False
4346n/a GO_BACK = 100
4347n/a events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
4348n/a for e in events:
4349n/a if e.SourceName != 'test_logging':
4350n/a continue
4351n/a msg = win32evtlogutil.SafeFormatMessage(e, logtype)
4352n/a if msg != 'Test Log Message\r\n':
4353n/a continue
4354n/a found = True
4355n/a break
4356n/a msg = 'Record not found in event log, went back %d records' % GO_BACK
4357n/a self.assertTrue(found, msg=msg)
4358n/a
4359n/a
4360n/aclass MiscTestCase(unittest.TestCase):
4361n/a def test__all__(self):
4362n/a blacklist = {'logThreads', 'logMultiprocessing',
4363n/a 'logProcesses', 'currentframe',
4364n/a 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
4365n/a 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger',
4366n/a 'root', 'threading'}
4367n/a support.check__all__(self, logging, blacklist=blacklist)
4368n/a
4369n/a
4370n/a# Set the locale to the platform-dependent default. I have no idea
4371n/a# why the test does this, but in any case we save the current locale
4372n/a# first and restore it at the end.
4373n/a@support.run_with_locale('LC_ALL', '')
4374n/adef test_main():
4375n/a tests = [
4376n/a BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
4377n/a HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
4378n/a DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
4379n/a ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest,
4380n/a StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest,
4381n/a QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
4382n/a LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest,
4383n/a RotatingFileHandlerTest, LastResortTest, LogRecordTest,
4384n/a ExceptionTest, SysLogHandlerTest, HTTPHandlerTest,
4385n/a NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
4386n/a UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
4387n/a MiscTestCase
4388n/a ]
4389n/a if hasattr(logging.handlers, 'QueueListener'):
4390n/a tests.append(QueueListenerTest)
4391n/a support.run_unittest(*tests)
4392n/a
4393n/aif __name__ == "__main__":
4394n/a test_main()