304 lines
8.4 KiB
Python
304 lines
8.4 KiB
Python
# Copyright (c) Twisted Matrix Laboratories.
|
|
# See LICENSE for details.
|
|
|
|
"""
|
|
Test cases for L{twisted.logger._format}.
|
|
"""
|
|
|
|
import sys
|
|
from io import BytesIO, TextIOWrapper
|
|
import logging as py_logging
|
|
from inspect import getsourcefile
|
|
|
|
from zope.interface.verify import verifyObject, BrokenMethodImplementation
|
|
|
|
from twisted.python.compat import _PY3, currentframe
|
|
from twisted.python.failure import Failure
|
|
from twisted.trial import unittest
|
|
|
|
from .._levels import LogLevel
|
|
from .._observer import ILogObserver
|
|
from .._stdlib import STDLibLogObserver
|
|
|
|
|
|
def nextLine():
|
|
"""
|
|
Retrive the file name and line number immediately after where this function
|
|
is called.
|
|
|
|
@return: the file name and line number
|
|
@rtype: 2-L{tuple} of L{str}, L{int}
|
|
"""
|
|
caller = currentframe(1)
|
|
return (getsourcefile(sys.modules[caller.f_globals['__name__']]),
|
|
caller.f_lineno + 1)
|
|
|
|
|
|
|
|
class STDLibLogObserverTests(unittest.TestCase):
|
|
"""
|
|
Tests for L{STDLibLogObserver}.
|
|
"""
|
|
|
|
def test_interface(self):
|
|
"""
|
|
L{STDLibLogObserver} is an L{ILogObserver}.
|
|
"""
|
|
observer = STDLibLogObserver()
|
|
try:
|
|
verifyObject(ILogObserver, observer)
|
|
except BrokenMethodImplementation as e:
|
|
self.fail(e)
|
|
|
|
|
|
def py_logger(self):
|
|
"""
|
|
Create a logging object we can use to test with.
|
|
|
|
@return: a stdlib-style logger
|
|
@rtype: L{StdlibLoggingContainer}
|
|
"""
|
|
logger = StdlibLoggingContainer()
|
|
self.addCleanup(logger.close)
|
|
return logger
|
|
|
|
|
|
def logEvent(self, *events):
|
|
"""
|
|
Send one or more events to Python's logging module, and
|
|
capture the emitted L{logging.LogRecord}s and output stream as
|
|
a string.
|
|
|
|
@param events: events
|
|
@type events: L{tuple} of L{dict}
|
|
|
|
@return: a tuple: (records, output)
|
|
@rtype: 2-tuple of (L{list} of L{logging.LogRecord}, L{bytes}.)
|
|
"""
|
|
pl = self.py_logger()
|
|
observer = STDLibLogObserver(
|
|
# Add 1 to default stack depth to skip *this* frame, since
|
|
# tests will want to know about their own frames.
|
|
stackDepth=STDLibLogObserver.defaultStackDepth + 1
|
|
)
|
|
for event in events:
|
|
observer(event)
|
|
return pl.bufferedHandler.records, pl.outputAsText()
|
|
|
|
|
|
def test_name(self):
|
|
"""
|
|
Logger name.
|
|
"""
|
|
records, output = self.logEvent({})
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0].name, "twisted")
|
|
|
|
|
|
def test_levels(self):
|
|
"""
|
|
Log levels.
|
|
"""
|
|
levelMapping = {
|
|
None: py_logging.INFO, # Default
|
|
LogLevel.debug: py_logging.DEBUG,
|
|
LogLevel.info: py_logging.INFO,
|
|
LogLevel.warn: py_logging.WARNING,
|
|
LogLevel.error: py_logging.ERROR,
|
|
LogLevel.critical: py_logging.CRITICAL,
|
|
}
|
|
|
|
# Build a set of events for each log level
|
|
events = []
|
|
for level, pyLevel in levelMapping.items():
|
|
event = {}
|
|
|
|
# Set the log level on the event, except for default
|
|
if level is not None:
|
|
event["log_level"] = level
|
|
|
|
# Remember the Python log level we expect to see for this
|
|
# event (as an int)
|
|
event["py_levelno"] = int(pyLevel)
|
|
|
|
events.append(event)
|
|
|
|
records, output = self.logEvent(*events)
|
|
self.assertEqual(len(records), len(levelMapping))
|
|
|
|
# Check that each event has the correct level
|
|
for i in range(len(records)):
|
|
self.assertEqual(records[i].levelno, events[i]["py_levelno"])
|
|
|
|
|
|
def test_callerInfo(self):
|
|
"""
|
|
C{pathname}, C{lineno}, C{exc_info}, C{func} is set properly on
|
|
records.
|
|
"""
|
|
filename, logLine = nextLine()
|
|
records, output = self.logEvent({})
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0].pathname, filename)
|
|
self.assertEqual(records[0].lineno, logLine)
|
|
self.assertIsNone(records[0].exc_info)
|
|
|
|
# Attribute "func" is missing from record, which is weird because it's
|
|
# documented.
|
|
# self.assertEqual(records[0].func, "test_callerInfo")
|
|
|
|
|
|
def test_basicFormat(self):
|
|
"""
|
|
Basic formattable event passes the format along correctly.
|
|
"""
|
|
event = dict(log_format="Hello, {who}!", who="dude")
|
|
records, output = self.logEvent(event)
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(str(records[0].msg), u"Hello, dude!")
|
|
self.assertEqual(records[0].args, ())
|
|
|
|
|
|
def test_basicFormatRendered(self):
|
|
"""
|
|
Basic formattable event renders correctly.
|
|
"""
|
|
event = dict(log_format="Hello, {who}!", who="dude")
|
|
records, output = self.logEvent(event)
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertTrue(output.endswith(u":Hello, dude!\n"),
|
|
repr(output))
|
|
|
|
|
|
def test_noFormat(self):
|
|
"""
|
|
Event with no format.
|
|
"""
|
|
records, output = self.logEvent({})
|
|
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(str(records[0].msg), "")
|
|
|
|
|
|
def test_failure(self):
|
|
"""
|
|
An event with a failure logs the failure details as well.
|
|
"""
|
|
def failing_func():
|
|
1 / 0
|
|
try:
|
|
failing_func()
|
|
except ZeroDivisionError:
|
|
failure = Failure()
|
|
event = dict(log_format='Hi mom', who='me', log_failure=failure)
|
|
records, output = self.logEvent(event)
|
|
self.assertEqual(len(records), 1)
|
|
self.assertIn(u'Hi mom', output)
|
|
self.assertIn(u'in failing_func', output)
|
|
self.assertIn(u'ZeroDivisionError', output)
|
|
|
|
|
|
def test_cleanedFailure(self):
|
|
"""
|
|
A cleaned Failure object has a fake traceback object; make sure that
|
|
logging such a failure still results in the exception details being
|
|
logged.
|
|
"""
|
|
def failing_func():
|
|
1 / 0
|
|
try:
|
|
failing_func()
|
|
except ZeroDivisionError:
|
|
failure = Failure()
|
|
failure.cleanFailure()
|
|
event = dict(log_format='Hi mom', who='me', log_failure=failure)
|
|
records, output = self.logEvent(event)
|
|
self.assertEqual(len(records), 1)
|
|
self.assertIn(u'Hi mom', output)
|
|
self.assertIn(u'in failing_func', output)
|
|
self.assertIn(u'ZeroDivisionError', output)
|
|
|
|
|
|
|
|
class StdlibLoggingContainer(object):
|
|
"""
|
|
Continer for a test configuration of stdlib logging objects.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.rootLogger = py_logging.getLogger("")
|
|
|
|
self.originalLevel = self.rootLogger.getEffectiveLevel()
|
|
self.rootLogger.setLevel(py_logging.DEBUG)
|
|
|
|
self.bufferedHandler = BufferedHandler()
|
|
self.rootLogger.addHandler(self.bufferedHandler)
|
|
|
|
self.streamHandler, self.output = handlerAndBytesIO()
|
|
self.rootLogger.addHandler(self.streamHandler)
|
|
|
|
|
|
def close(self):
|
|
"""
|
|
Close the logger.
|
|
"""
|
|
self.rootLogger.setLevel(self.originalLevel)
|
|
self.rootLogger.removeHandler(self.bufferedHandler)
|
|
self.rootLogger.removeHandler(self.streamHandler)
|
|
self.streamHandler.close()
|
|
self.output.close()
|
|
|
|
|
|
def outputAsText(self):
|
|
"""
|
|
Get the output to the underlying stream as text.
|
|
|
|
@return: the output text
|
|
@rtype: L{unicode}
|
|
"""
|
|
return self.output.getvalue().decode("utf-8")
|
|
|
|
|
|
|
|
def handlerAndBytesIO():
|
|
"""
|
|
Construct a 2-tuple of C{(StreamHandler, BytesIO)} for testing interaction
|
|
with the 'logging' module.
|
|
|
|
@return: handler and io object
|
|
@rtype: tuple of L{StreamHandler} and L{io.BytesIO}
|
|
"""
|
|
output = BytesIO()
|
|
stream = output
|
|
template = py_logging.BASIC_FORMAT
|
|
if _PY3:
|
|
stream = TextIOWrapper(output, encoding="utf-8", newline="\n")
|
|
formatter = py_logging.Formatter(template)
|
|
handler = py_logging.StreamHandler(stream)
|
|
handler.setFormatter(formatter)
|
|
return handler, output
|
|
|
|
|
|
|
|
class BufferedHandler(py_logging.Handler):
|
|
"""
|
|
A L{py_logging.Handler} that remembers all logged records in a list.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initialize this L{BufferedHandler}.
|
|
"""
|
|
py_logging.Handler.__init__(self)
|
|
self.records = []
|
|
|
|
|
|
def emit(self, record):
|
|
"""
|
|
Remember the record.
|
|
"""
|
|
self.records.append(record)
|