栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

logging模块详解

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

logging模块详解

logging简介

logging是python的内置库,主要用于进行格式化内容输出,可将格式化内容输出到文件,也可输出到屏幕。我们在开发过程中常用print函数来进行调试,但是实际应用部署时我们要将日志的信息要输出到文件中,方便后续查找以及备份。在我们使用日志管理时,我们还可以将日志格式化成json对象转存到ELK中方便图形化查看及管理。前面说的这些,我们都可以通过logging所包含的功能以及提供扩展的方式来完成。

logging工作流程

这是从网上查到一个关于python logging模块的工作流程图,非常遗憾没有找到出处。

可以看到图中,几个关键的类

Logger

用于记录日志的对象。

通过流程图可以看到

  1. 判断是否enabled,实质就是看记录的level(logger.info,logger.debug等)和当前logger对象设置的level是否满足(比如logger设置的lever是Info,记录时使用的logger.debug,那么就会不满足,所以不会记录日志)
  2. 查看logger的过滤器是否满足。filter通过之后,交给logger的handler来记录日志,一个logger是可以设置多个handler。
  3. 交给Handlers实际记录日志

注:在整个应用中可以有多个logger,使用logging.getLogger时通过指定name来获取对象,实际logging中还存在一个Manager类,由Manager来进行多logger的单例模式管理。

Handler

用于记录日志到具体的文件或者输出流或其他的管道。

  1. 查看记录日志是否满足过滤器
  2. 满足过滤器,按照设置的Formatter生成字符串
  3. 将内容写入到具体的文件或者输出流

不同的Handler可能有不同的处理,但是底层原理还是做这三件事情。

Filter

用于过滤用户记录日志时,是否满足记录标准

Formatter

用于格式化,输出的字符串格式

这是原生自带的格式

%(name)s     Name of the logger (logging channel)
    %(levelno)s  Numeric logging level for the message (DEBUG, INFO,
   WARNING, ERROR, CRITICAL)
    %(levelname)sText logging level for the message ("DEBUG", "INFO",
   "WARNING", "ERROR", "CRITICAL")
    %(pathname)s Full pathname of the source file where the logging
   call was issued (if available)
    %(filename)s Filename portion of pathname
    %(module)s   Module (name portion of filename)
    %(lineno)d   Source line number where the logging call was issued
   (if available)
    %(funcName)s Function name
    %(created)f  Time when the LogRecord was created (time.time()
   return value)
    %(asctime)s  Textual time when the LogRecord was created
    %(msecs)d    Millisecond portion of the creation time
    %(relativeCreated)d Time in milliseconds when the LogRecord was created,
   relative to the time the logging module was loaded
   (typically at application startup time)
    %(thread)d   Thread ID (if available)
    %(threadName)s      Thread name (if available)
    %(process)d  Process ID (if available)
    %(message)s  The result of record.getMessage(), computed just as
   the record is emitted
LogRecord

我们每一次的 logger.info logger.debug logger.error等实际上都是进行一次LogRecord的处理

包含一些基本的输出日志内容,以及内容中参数,还有附带的函数名称,行数等信息

logging源码阅读

这是我这边基于字典的一个logging配置

import logging
import logging.config
import os

basedir = os.path.abspath(os.path.dirname(__file__))
log_path = os.path.join(basedir, '..', 'logs')
service_name = "test_log"
if not os.path.isdir(log_path):
    os.mkdir(log_path)
    
class InfoFilter(logging.Filter):
    def filter(self, record):
 """
 只筛选出INFO级别的日志
 """
 if logging.INFO <= record.levelno < logging.ERROR:
     return super().filter(record)
 else:
     return 0


class ErrorFilter(logging.Filter):
    def filter(self, record):
 """
 只筛选出ERROR级别的日志
 """
 if logging.ERROR <= record.levelno < logging.CRITICAL:
     return super().filter(record)
 else:
     return 0

LOG_CONFIG_DICT = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
 'simple': {
     'class': 'logging.Formatter',
     'format': '%(asctime)s %(levelname)s %(name)s %(filename)s %(module)s %(funcName)s '
 '%(lineno)d %(thread)d %(threadName)s %(process)d %(processName)s %(message)s'
 },
 # json模式, 方便ELK收集处理
 'json': {
     'class': 'logging.Formatter',
     'format': '{"time:":"%(asctime)s","level":"%(levelname)s","logger_name":"%(name)s",'
 '"file_name":"%(filename)s","module":"%(module)s","func_name":"%(funcName)s",'
 '"line_number":"%(lineno)d","thread_id":"%(thread)d","thread_name":"%(threadName)s",'
 '"process_id":"%(process)d","process_name":"%(processName)s","message":"%(message)s"}'}
    },
    # 过滤器
    'filters': {
 'info_filter': {
     '()': InfoFilter
 },
 'error_filter': {
     '()': ErrorFilter
 }
    },
    # 处理器
    'handlers': {
 # 控制台输出
 'console': {
     'class': 'logging.StreamHandler',
     'level': 'INFO',
     'formatter': 'simple'
 },
 # info文件输出
 'info_file': {
     'level': 'INFO',
     'formatter': 'json',
     'class': 'logging.handlers.TimedRotatingFileHandler',
     'filename': '{0}/{1}_info.log'.format(log_path, service_name),
     'when': "d",
     'interval': 1,
     'encoding': 'utf8',
     'backupCount': 30,
     'filters': ['info_filter']
 },
 # error文件输出
 'error_file': {
     'level': 'ERROR',
     'formatter': 'json',
     'class': 'logging.handlers.TimedRotatingFileHandler',
     'filename': '{0}/{1}_error.log'.format(log_path, service_name),
     'when': "d",
     'interval': 1,
     'encoding': 'utf8',
     'backupCount': 30,
     'filters': ['error_filter']
 }
    },
    # 记录器
    'loggers': {
 'full_logger': {
     'handlers': ['console', 'info_file', 'error_file'],
     'level': 'INFO'
 },
 'only_console_logger': {
     'handlers': ['console'],
     'level': 'INFO'
 },
 'only_file_logger': {
     'handlers': ['info_file', 'error_file']
 }
    }
}

logging.config.dictConfig(LOG_CONFIG_DICT)

下面我们就基于config第一次配置时整个logging的工作原理。结合代码进行分析logging的工作之路。

我的python版本是Python3.6

从logging.config开始

dictConfigClass = DictConfigurator

def dictConfig(config):
    """Configure logging using a dictionary."""
    dictConfigClass(config).configure()

实质上是实例化logging.config.DictConfigurator类的对象,然后执行其configure方法。

class DictConfigurator(baseConfigurator):
    """
    Configure logging using a dictionary-like object to describe the
    configuration.
    """

    def configure(self):
 """Do the configuration."""

 config = self.config
 if 'version' not in config:
     raise ValueError("dictionary doesn't specify a version")
 if config['version'] != 1:
     raise ValueError("Unsupported version: %s" % config['version'])
 incremental = config.pop('incremental', False)
 EMPTY_DICT = {}
 logging._acquireLock()
 try:
     if incremental:
  handlers = config.get('handlers', EMPTY_DICT)
  for name in handlers:
      if name not in logging._handlers:
   raise ValueError('No handler found with '
      'name %r'  % name)
      else:
   try:
handler = logging._handlers[name]
handler_config = handlers[name]
level = handler_config.get('level', None)
if level:
    handler.setLevel(logging._checkLevel(level))
   except Exception as e:
raise ValueError('Unable to configure handler '
   '%r: %s' % (name, e))
  loggers = config.get('loggers', EMPTY_DICT)
  for name in loggers:
      try:
   self.configure_logger(name, loggers[name], True)
      except Exception as e:
   raise ValueError('Unable to configure logger '
      '%r: %s' % (name, e))
  root = config.get('root', None)
  if root:
      try:
   self.configure_root(root, True)
      except Exception as e:
   raise ValueError('Unable to configure root '
      'logger: %s' % e)
     else:
  disable_existing = config.pop('disable_existing_loggers', True)

  logging._handlers.clear()
  del logging._handlerList[:]

  # Do formatters first - they don't refer to anything else
  formatters = config.get('formatters', EMPTY_DICT)
  for name in formatters:
      try:
   formatters[name] = self.configure_formatter(
    formatters[name])
      except Exception as e:
   raise ValueError('Unable to configure '
      'formatter %r: %s' % (name, e))
  # Next, do filters - they don't refer to anything else, either
  filters = config.get('filters', EMPTY_DICT)
  for name in filters:
      try:
   filters[name] = self.configure_filter(filters[name])
      except Exception as e:
   raise ValueError('Unable to configure '
      'filter %r: %s' % (name, e))
  handlers = config.get('handlers', EMPTY_DICT)
  deferred = []
  for name in sorted(handlers):
      try:
   handler = self.configure_handler(handlers[name])
   handler.name = name
   handlers[name] = handler
      except Exception as e:
   if 'target not configured yet' in str(e):
deferred.append(name)
   else:
raise ValueError('Unable to configure handler '
   '%r: %s' % (name, e))

  # Now do any that were deferred
  for name in deferred:
      try:
   handler = self.configure_handler(handlers[name])
   handler.name = name
   handlers[name] = handler
      except Exception as e:
   raise ValueError('Unable to configure handler '
      '%r: %s' % (name, e))

  root = logging.root
  existing = list(root.manager.loggerDict.keys())

  existing.sort()

  child_loggers = []
  #now set up the new ones...
  loggers = config.get('loggers', EMPTY_DICT)
  for name in loggers:
      if name in existing:
   i = existing.index(name) + 1 # look after name
   prefixed = name + "."
   pflen = len(prefixed)
   num_existing = len(existing)
   while i < num_existing:
if existing[i][:pflen] == prefixed:
    child_loggers.append(existing[i])
i += 1
   existing.remove(name)
      try:
   self.configure_logger(name, loggers[name])
      except Exception as e:
   raise ValueError('Unable to configure logger '
      '%r: %s' % (name, e))

  _handle_existing_loggers(existing, child_loggers,
      disable_existing)

  # And finally, do the root logger
  root = config.get('root', None)
  if root:
      try:
   self.configure_root(root)
      except Exception as e:
   raise ValueError('Unable to configure root '
      'logger: %s' % e)
 finally:
     logging._releaseLock()

我们对incremental不进行设置,即使用全量的方式进行配置,对于所有的handler重置并处理

看下来基本上分为4步来走。

  1. 配置Formatter
  2. 配置Filter
  3. 配置Handler
  4. 配置Logger
Formatter配置

源码部分

    def configure_formatter(self, config):
 """Configure a formatter from a dictionary."""
 if '()' in config:
     factory = config['()'] # for use in exception handler
     try:
  result = self.configure_custom(config)
     except TypeError as te:
  if "'format'" not in str(te):
      raise
  config['fmt'] = config.pop('format')
  config['()'] = factory
  result = self.configure_custom(config)
 else:
     fmt = config.get('format', None)
     dfmt = config.get('datefmt', None)
     style = config.get('style', '%')
     cname = config.get('class', None)
     if not cname:
  c = logging.Formatter
     else:
  c = _resolve(cname)
     result = c(fmt, dfmt, style)
 return result

可以使用自己的Formatter,默认使用logging.Formatter

Filter配置
    def configure_filter(self, config):
 """Configure a filter from a dictionary."""
 if '()' in config:
     result = self.configure_custom(config)
 else:
     name = config.get('name', '')
     result = logging.Filter(name)
 return result

设置filter,可以参考我上面配置的字典,可以自己定义Filter也可以使用系统内置Filter

Handler配置
    def configure_handler(self, config):
 """Configure a handler from a dictionary."""
 config_copy = dict(config)  # for restoring in case of error
 formatter = config.pop('formatter', None)
 if formatter:
     try:
  formatter = self.config['formatters'][formatter]
     except Exception as e:
  raise ValueError('Unable to set formatter '
     '%r: %s' % (formatter, e))
 level = config.pop('level', None)
 filters = config.pop('filters', None)
 if '()' in config:
     c = config.pop('()')
     if not callable(c):
  c = self.resolve(c)
     factory = c
 else:
     cname = config.pop('class')
     klass = self.resolve(cname)
     #Special case for handler which refers to another handler
     if issubclass(klass, logging.handlers.MemoryHandler) and
  'target' in config:
  try:
      th = self.config['handlers'][config['target']]
      if not isinstance(th, logging.Handler):
   config.update(config_copy)  # restore for deferred cfg
   raise TypeError('target not configured yet')
      config['target'] = th
  except Exception as e:
      raise ValueError('Unable to set target handler '
  '%r: %s' % (config['target'], e))
     elif issubclass(klass, logging.handlers.SMTPHandler) and
  'mailhost' in config:
  config['mailhost'] = self.as_tuple(config['mailhost'])
     elif issubclass(klass, logging.handlers.SysLogHandler) and
  'address' in config:
  config['address'] = self.as_tuple(config['address'])
     factory = klass
 props = config.pop('.', None)
 kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
 try:
     result = factory(**kwargs)
 except TypeError as te:
     if "'stream'" not in str(te):
  raise
     kwargs['strm'] = kwargs.pop('stream')
     result = factory(**kwargs)
 if formatter:
     result.setFormatter(formatter)
 if level is not None:
     result.setLevel(logging._checkLevel(level))
 if filters:
     self.add_filters(result, filters)
 if props:
     for name, value in props.items():
  setattr(result, name, value)
 return result

可以看到,前面顺序先配置formatter和filter是因为这里需要引用的到

值得注意的是,我们初始化时传递的一个字典,在整个配置过程中,字典里面的值会随着我们每次的配置变化而变化,所以我们在每个元素配置之后,在使用上一个字典元素时,就是配置完成之后的元素,为了方便理解,将配置filter之前和配置filter之后,config中的filter变化列出来

config == > before filters {'info_filter': {'()': }, 'error_filter': {'()': }} 
config == > after filters {'info_filter': , 'error_filter': }

配置前是我们配置文件中的内容,配置完成之后filter已经是一组对象了,所以在配置handler时我们就可以直接使用对象add_filter了。

Logger配置
    def common_logger_config(self, logger, config, incremental=False):
 level = config.get('level', None)
 if level is not None:
     logger.setLevel(logging._checkLevel(level))
 if not incremental:
     #Remove any existing handlers
     for h in logger.handlers[:]:
  logger.removeHandler(h)
     handlers = config.get('handlers', None)
     if handlers:
  self.add_handlers(logger, handlers)
     filters = config.get('filters', None)
     if filters:
  self.add_filters(logger, filters)

    def configure_logger(self, name, config, incremental=False):
 """Configure a non-root logger from a dictionary."""
 logger = logging.getLogger(name)
 self.common_logger_config(logger, config, incremental)
 propagate = config.get('propagate', None)
 if propagate is not None:
     logger.propagate = propagate

这就比较容易理解了,将我们上面配置过的filters和handlers添加到我们的logger中。

这里需要注意的一点是logger = logging.getLogger(name),看下logger.getLogger源码

root = RootLogger(WARNING)
Logger.root = root
Logger.manager = Manager(Logger.root)
def getLogger(name=None):
    if name:
 return Logger.manager.getLogger(name)
    else:
 return root
class Manager(object):
    def __init__(self, rootnode):
 self.root = rootnode
 self.disable = 0
 self.emittedNoHandlerWarning = False
 self.loggerDict = {}
 self.loggerClass = None
 self.logRecordFactory = None

    def getLogger(self, name):
 rv = None
 if not isinstance(name, str):
     raise TypeError('A logger name must be a string')
 _acquireLock()
 try:
     if name in self.loggerDict:
  rv = self.loggerDict[name]
  if isinstance(rv, PlaceHolder):
      ph = rv
      rv = (self.loggerClass or _loggerClass)(name)
      rv.manager = self
      self.loggerDict[name] = rv
      self._fixupChildren(ph, rv)
      self._fixupParents(rv)
     else:
  rv = (self.loggerClass or _loggerClass)(name)
  rv.manager = self
  self.loggerDict[name] = rv
  self._fixupParents(rv)
 finally:
     _releaseLock()
 return rv

可以看到,logging使用Mangaer进行logger的单例管理

截止到这里,基本上我们使用前的准备,logging都替我们准备好了,下面就是我们的使用

获取logger并记录日志
class Logger(Filterer):
	
	def info(self, msg, *args, **kwargs):
 if self.isEnabledFor(INFO):
     self._log(INFO, msg, args, **kwargs)
    
    def error(self, msg, *args, **kwargs):
 if self.isEnabledFor(ERROR):
     self._log(ERROR, msg, args, **kwargs)
    
    def isEnabledFor(self, level):
 if self.manager.disable >= level:
     return False
 return level >= self.getEffectiveLevel()
	
    def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False):
 sinfo = None
 if _srcfile:
     try:
  fn, lno, func, sinfo = self.findCaller(stack_info)
     except ValueError: # pragma: no cover
  fn, lno, func = "(unknown file)", 0, "(unknown function)"
 else: # pragma: no cover
     fn, lno, func = "(unknown file)", 0, "(unknown function)"
 if exc_info:
     if isinstance(exc_info, baseException):
  exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
     elif not isinstance(exc_info, tuple):
  exc_info = sys.exc_info()
 record = self.makeRecord(self.name, level, fn, lno, msg, args,
     exc_info, func, extra, sinfo)
 self.handle(record)

    def handle(self, record):
 if (not self.disabled) and self.filter(record):
     self.callHandlers(record)
     
    def callHandlers(self, record):
 c = self
 found = 0
 while c:
     for hdlr in c.handlers:
  found = found + 1
  if record.levelno >= hdlr.level:
      hdlr.handle(record)
     if not c.propagate:
  c = None    #break out
     else:
  c = c.parent
 if (found == 0):
     if lastResort:
  if record.levelno >= lastResort.level:
      lastResort.handle(record)
     elif raiseExceptions and not self.manager.emittedNoHandlerWarning:
  sys.stderr.write("No handlers could be found for logger"
     " "%s"n" % self.name)
  self.manager.emittedNoHandlerWarning = True
class StreamHandler(Handler):

    terminator = 'n'

    def __init__(self, stream=None):
 """
 Initialize the handler.

 If stream is not specified, sys.stderr is used.
 """
 Handler.__init__(self)
 if stream is None:
     stream = sys.stderr
 self.stream = stream

    def flush(self):
 self.acquire()
 try:
     if self.stream and hasattr(self.stream, "flush"):
  self.stream.flush()
 finally:
     self.release()

    def emit(self, record):
 try:
     msg = self.format(record)
     stream = self.stream
     stream.write(msg)
     stream.write(self.terminator)
     self.flush()
 except Exception:
     self.handleError(record)

    def __repr__(self):
 level = getLevelName(self.level)
 name = getattr(self.stream, 'name', '')
 if name:
     name += ' '
 return '<%s %s(%s)>' % (self.__class__.__name__, name, level)

根据代码可以看到符合我们流程图看到的流程,我们细化一下就是

  1. 查看记录日志是否满足过滤器,然后准备logrecord中的信息并生成logrecord
  2. 将logrecord交给所有handlers处理
  3. 在handler中确定是否满足handler过滤器,满足的话按照配置的Formatter生成字符串
  4. 将内容写入到具体的文件或者输出流
logger = logging.getLogger("full_logger")

logger.info("111")
# 根据我们的配置 这里会输出到流中,以及记录到test_log_info.log文件中

logger.error("error")
# 根据我们的配置 这里会输出到流中,以及记录到test_log_error.info文件中

有兴趣的同学可以继续试试其他好玩的处理

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号