我应该如何记录,而使用多处理在蟒蛇?
-
22-07-2019 - |
题
现在我有一个中央模块在一个框架,产生多过程中使用的Python2.6 multiprocessing
模块.因为它使用 multiprocessing
, ,有模块级处理意识的日志 LOG = multiprocessing.get_logger()
.每 文档, ,这一记录器,具有处理共享的锁的所以你不要混淆的事情了 sys.stderr
(或者任何文件句柄)通过具有多个程序写入它同时进行。
这个问题我现在已经是其他模块框架不多处理意识。在我看来,我需要让所有依赖性这个核心模块,使用多处理知的记录。这很烦人 内 框架,让我们单独对所有客户的框架。有没有替代品我不在想什么?
解决方案
非侵入式处理此问题的唯一方法是:
- 生成每个工作进程,使其日志转到 不同的文件描述符 (到磁盘或管道。)理想情况下,所有日志条目都应该带有时间戳。
- 然后你的控制器进程可以做 一 以下的:
- 如果使用磁盘文件: 在运行结束时合并日志文件,按时间戳排序
- 如果使用管道(推荐): 将来自所有管道的日志条目即时合并到中央日志文件中。(例如,定期
select
从管道的文件描述符中,对可用日志条目执行合并排序,然后刷新到集中日志。重复。)
其他提示
我刚才写我自己的日志处理程序,只是通过管向一切父进程。我只测试它十分钟,但它似乎工作得很好。
(注意::此是硬编码到RotatingFileHandler
,这是我自己用的情况下)
更新:@javier保持现在这种方法作为封装PyPI上 - 看到多测井 PyPI上,github上在 https://github.com/jruere/multiprocessing-logging
更新:实施
此现在使用并发的正确处理队列中,并且还从错误中恢复正常。我现在已经在生产中使用这个数月,且低于当前版本就没有问题。
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)
self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self._handler.close()
logging.Handler.close(self)
另一种选择可能是各种非基于文件的日志记录处理程序 logging
包裹:
SocketHandler
DatagramHandler
SyslogHandler
(和别的)
这样,您就可以轻松地在某个地方拥有一个日志守护进程,您可以安全地写入并正确处理结果。(例如,一个简单的套接字服务器,它只是解封消息并将其发送到自己的旋转文件处理程序。)
这 SyslogHandler
也会为你处理这件事。当然,您可以使用自己的实例 syslog
, ,不是系统一。
蟒蛇记录食谱有两个完整的例子在这里: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
它使用 QueueHandler
, ,这是新python3.2而容易复制到自己的代码(作为我做我自己在python2.7): https://gist.github.com/vsajip/591589
每个过程将其登录在 Queue
, ,然后一个 listener
线或处理过程(一个实例是每一个提供)挑选那些最并将它们写入所有的文件没有腐败的风险或garbling.
其他的变体,保持该记录和队列螺纹分开的。
"""sample code for logging in subprocesses using multiprocessing
* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
process.
* As in the other implementations, a thread reads the queue and calls the
handlers. Except in this implementation, the thread is defined outside of a
handler, which makes the logger definitions simpler.
* Works with multiple handlers. If the logger in the main process defines
multiple handlers, they will all be fed records generated by the
subprocesses loggers.
tested with Python 2.5 and 2.6 on Linux and Windows
"""
import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys
DEFAULT_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")
class SubProcessLogHandler(logging.Handler):
"""handler used by subprocesses
It simply puts items on a Queue for the main process to log.
"""
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
self.queue.put(record)
class LogQueueReader(threading.Thread):
"""thread to write subprocesses log records to main process log
This thread reads the records written by subprocesses and writes them to
the handlers defined in the main process's handlers.
"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.daemon = True
def run(self):
"""read from the queue and write to the log handlers
The logging documentation says logging is thread safe, so there
shouldn't be contention between normal logging (from the main
process) and this thread.
Note that we're using the name of the original logger.
"""
# Thanks Mike for the error checking code.
while True:
try:
record = self.queue.get()
# get the logger for this record
logger = logging.getLogger(record.name)
logger.callHandlers(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
class LoggingProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def _setupLogger(self):
# create the logger to use.
logger = logging.getLogger('test.subprocess')
# The only handler desired is the SubProcessLogHandler. If any others
# exist, remove them. In this case, on Unix and Linux the StreamHandler
# will be inherited.
for handler in logger.handlers:
# just a check for my sanity
assert not isinstance(handler, SubProcessLogHandler)
logger.removeHandler(handler)
# add the handler
handler = SubProcessLogHandler(self.queue)
handler.setFormatter(formatter)
logger.addHandler(handler)
# On Windows, the level will not be inherited. Also, we could just
# set the level to log everything here and filter it in the main
# process handlers. For now, just set it from the global default.
logger.setLevel(DEFAULT_LEVEL)
self.logger = logger
def run(self):
self._setupLogger()
logger = self.logger
# and here goes the logging
p = multiprocessing.current_process()
logger.info('hello from process %s with pid %s' % (p.name, p.pid))
if __name__ == '__main__':
# queue used by the subprocess loggers
queue = multiprocessing.Queue()
# Just a normal logger
logger = logging.getLogger('test')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(DEFAULT_LEVEL)
logger.info('hello from the main process')
# This thread will read from the subprocesses and write to the main log's
# handlers.
log_queue_reader = LogQueueReader(queue)
log_queue_reader.start()
# create the processes.
for i in range(10):
p = LoggingProcess(queue)
p.start()
# The way I read the multiprocessing warning about Queue, joining a
# process before it has finished feeding the Queue can cause a deadlock.
# Also, Queue.empty() is not realiable, so just make sure all processes
# are finished.
# active_children joins subprocesses when they're finished.
while multiprocessing.active_children():
time.sleep(.1)
下面是一个专注于简单的任何人(像我)谁从谷歌到这里的另一个解决方案。记录应该很容易!只为3.2或更高。
import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random
def f(i):
time.sleep(random.uniform(.01, .05))
logging.info('function called with {} in worker thread.'.format(i))
time.sleep(random.uniform(.01, .05))
return i
def worker_init(q):
# all records from worker processes go to qh and then into q
qh = QueueHandler(q)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(qh)
def logger_init():
q = multiprocessing.Queue()
# this is the handler for all log records
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
# ql gets records from the queue and sends them to the handler
ql = QueueListener(q, handler)
ql.start()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# add the handler to the logger so records from this process are handled
logger.addHandler(handler)
return ql, q
def main():
q_listener, q = logger_init()
logging.info('hello from main thread')
pool = multiprocessing.Pool(4, worker_init, [q])
for result in pool.map(f, range(10)):
pass
pool.close()
pool.join()
q_listener.stop()
if __name__ == '__main__':
main()
所有当前的解决方案都通过使用处理程序与日志记录配置过于耦合。我的解决方案具有以下架构和功能:
- 您可以使用 任何 你想要的日志配置
- 日志记录是在守护线程中完成的
- 使用上下文管理器安全关闭守护程序
- 与日志记录线程的通信是通过以下方式完成的
multiprocessing.Queue
- 在子流程中,
logging.Logger
(和已经定义的实例)被修补以发送 全部 记录到队列 - 新的: :在发送到队列之前格式化回溯和消息以防止酸洗错误
带有使用示例和输出的代码可以在以下要点中找到: https://gist.github.com/schlamar/7003737
因为我们可以代表多进程日志记录作为许多出版商和一个订户(listener),使用 ZeroMQ 执行吧-子的消息确实是一个选项。
此外, PyZMQ 模块,蟒蛇绑定ZMQ,实现了 PUBHandler, ,其对象为出版记录信息的一个zmq.酒吧座。
还有一个 解决方案在网上, 为集中的记录从分布式应用程序使用PyZMQ和PUBHandler,它可以很容易地通过工作地与多个出版过程。
formatters = {
logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}
# This one will be used by publishing processes
class PUBLogger:
def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
self._logger = logging.getLogger(__name__)
self._logger.setLevel(logging.DEBUG)
self.ctx = zmq.Context()
self.pub = self.ctx.socket(zmq.PUB)
self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
self._handler = PUBHandler(self.pub)
self._handler.formatters = formatters
self._logger.addHandler(self._handler)
@property
def logger(self):
return self._logger
# This one will be used by listener process
class SUBLogger:
def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
self.output_dir = output_dir
self._logger = logging.getLogger()
self._logger.setLevel(logging.DEBUG)
self.ctx = zmq.Context()
self._sub = self.ctx.socket(zmq.SUB)
self._sub.bind('tcp://*:{1}'.format(ip, port))
self._sub.setsockopt(zmq.SUBSCRIBE, "")
handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
handler.setFormatter(formatter)
self._logger.addHandler(handler)
@property
def sub(self):
return self._sub
@property
def logger(self):
return self._logger
# And that's the way we actually run things:
# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
sub_logger = SUBLogger(ip)
while not event.is_set():
try:
topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
log_msg = getattr(logging, topic.lower())
log_msg(message)
except zmq.ZMQError as zmq_error:
if zmq_error.errno == zmq.EAGAIN:
pass
# Publisher processes loggers should be initialized as follows:
class Publisher:
def __init__(self, stop_event, proc_id):
self.stop_event = stop_event
self.proc_id = proc_id
self._logger = pub_logger.PUBLogger('127.0.0.1').logger
def run(self):
self._logger.info("{0} - Sending message".format(proc_id))
def run_worker(event, proc_id):
worker = Publisher(event, proc_id)
worker.run()
# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
args=('127.0.0.1'), stop_event,))
sub_logger_process.start()
#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
processes.append(Process(target=run_worker,
args=(stop_event, i,)))
for p in processes:
p.start()
我也很喜欢zzzeek的答案,但安德烈是正确的队列是为了防止错乱。我有一些运气的管道,但没有看到错乱这是有点期待。实现它竟然是比我想象的困难,特别是由于在Windows上,那里有大约全局变量和东西(看到一些额外的限制运行:的怎么样的Python多重实现在Windows?)
不过,我终于得到了它的工作。这个例子可能是不完美的,所以的意见和建议,欢迎。它也不支持设置格式或大于根记录的任何其他。基本上,你必须重新初始化记录器中每一个与队列中的池处理,并且设置其他属性的记录器。
再次就如何使代码更好的任何建议,欢迎。我当然不知道所有的Python技巧尚未: - )
import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue
class MultiProcessingLogHandler(logging.Handler):
def __init__(self, handler, queue, child=False):
logging.Handler.__init__(self)
self._handler = handler
self.queue = queue
# we only want one of the loggers to be pulling from the queue.
# If there is a way to do this without needing to be passed this
# information, that would be great!
if child == False:
self.shutdown = False
self.polltime = 1
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
#print "receive on"
while (self.shutdown == False) or (self.queue.empty() == False):
# so we block for a short period of time so that we can
# check for the shutdown cases.
try:
record = self.queue.get(True, self.polltime)
self._handler.emit(record)
except Queue.Empty, e:
pass
def send(self, s):
# send just puts it in the queue for the server to retrieve
self.queue.put(s)
def _format_record(self, record):
ei = record.exc_info
if ei:
dummy = self.format(record) # just to get traceback text into record.exc_text
record.exc_info = None # to avoid Unpickleable error
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
time.sleep(self.polltime+1) # give some time for messages to enter the queue.
self.shutdown = True
time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown
def __del__(self):
self.close() # hopefully this aids in orderly shutdown when things are going poorly.
def f(x):
# just a logging command...
logging.critical('function number: ' + str(x))
# to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
time.sleep(x % 3)
def initPool(queue, level):
"""
This causes the logging module to be initialized with the necessary info
in pool threads to work correctly.
"""
logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
logging.getLogger('').setLevel(level)
if __name__ == '__main__':
stream = StringIO.StringIO()
logQueue = multiprocessing.Queue(100)
handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
logging.getLogger('').addHandler(handler)
logging.getLogger('').setLevel(logging.DEBUG)
logging.debug('starting main')
# when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
pool.map(f, range(0,50))
pool.close()
logging.debug('done')
logging.shutdown()
print "stream output is:"
print stream.getvalue()
只需在某处发布您的记录器实例即可。这样,其他模块和客户端就可以使用您的 API 来获取记录器,而无需 import multiprocessing
.
我喜欢zzzeek的答案。我只替换管为一个队列,因为如果多个线程/进程使用相同的管端,以生成日志消息,他们将得到乱码。
如何有关委派的所有记录到另一个进程读取从一个队列?
的所有日志条目LOG_QUEUE = multiprocessing.JoinableQueue()
class CentralLogger(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
self.log = logger.getLogger('some_config')
self.log.info("Started Central Logging process")
def run(self):
while True:
log_level, message = self.queue.get()
if log_level is None:
self.log.info("Shutting down Central Logging process")
break
else:
self.log.log(log_level, message)
central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()
只要通过任何的多进程机制,甚至继承共享LOG_QUEUE,这一切工作了罚款!
我有一个解决方案,它类似于ironhacker的不同之处在于在我我的一些代码中使用logging.exception,发现我需要自回溯不pickle'able传递回在队列之前格式化异常:
class QueueHandler(logging.Handler):
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
if record.exc_info:
# can't pass exc_info across processes so just format now
record.exc_text = self.formatException(record.exc_info)
record.exc_info = None
self.queue.put(record)
def formatException(self, ei):
sio = cStringIO.StringIO()
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
s = sio.getvalue()
sio.close()
if s[-1] == "\n":
s = s[:-1]
return s
下面是可以在Windows环境中使用的一类,需要ActivePython的。 也可以继承其它测井处理程序(StreamHandler中等)
class SyncronizedFileHandler(logging.FileHandler):
MUTEX_NAME = 'logging_mutex'
def __init__(self , *args , **kwargs):
self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)
def emit(self, *args , **kwargs):
try:
win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
finally:
win32event.ReleaseMutex(self.mutex)
return ret
和这里是演示用法的示例:
import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool
def f(i):
time.sleep(random.randint(0,10) * 0.1)
ch = random.choice(letters)
logging.info( ch * 30)
def init_logging():
'''
initilize the loggers
'''
formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
file_handler = SyncronizedFileHandler(sys.argv[1])
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
#must be called in the parent and in every worker process
init_logging()
if __name__ == '__main__':
#multiprocessing stuff
pool = Pool(processes=10)
imap_result = pool.imap(f , range(30))
for i , _ in enumerate(imap_result):
pass
下面是我简单的黑客/解决办法......不是最全面的,但很容易地修改和易于阅读和理解我觉得比我写这之前发现的任何其他答案:
import logging
import multiprocessing
class FakeLogger(object):
def __init__(self, q):
self.q = q
def info(self, item):
self.q.put('INFO - {}'.format(item))
def debug(self, item):
self.q.put('DEBUG - {}'.format(item))
def critical(self, item):
self.q.put('CRITICAL - {}'.format(item))
def warning(self, item):
self.q.put('WARNING - {}'.format(item))
def some_other_func_that_gets_logger_and_logs(num):
# notice the name get's discarded
# of course you can easily add this to your FakeLogger class
local_logger = logging.getLogger('local')
local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
local_logger.debug('hmm, something may need debugging here')
return num*2
def func_to_parallelize(data_chunk):
# unpack our args
the_num, logger_q = data_chunk
# since we're now in a new process, let's monkeypatch the logging module
logging.getLogger = lambda name=None: FakeLogger(logger_q)
# now do the actual work that happens to log stuff too
new_num = some_other_func_that_gets_logger_and_logs(the_num)
return (the_num, new_num)
if __name__ == '__main__':
multiprocessing.freeze_support()
m = multiprocessing.Manager()
logger_q = m.Queue()
# we have to pass our data to be parallel-processed
# we also need to pass the Queue object so we can retrieve the logs
parallelable_data = [(1, logger_q), (2, logger_q)]
# set up a pool of processes so we can take advantage of multiple CPU cores
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
worker_output = pool.map(func_to_parallelize, parallelable_data)
pool.close() # no more tasks
pool.join() # wrap up current tasks
# get the contents of our FakeLogger object
while not logger_q.empty():
print logger_q.get()
print 'worker output contained: {}'.format(worker_output)
一替代的是将mutliprocessing记录写入一个已知的文件并注册atexit
处理机加入关于这些进程读回stderr上;但是,你不会得到标准错误实时流输出消息的方式。
如果你具有在锁,线程和叉logging
模块中的组合发生死锁,被报告在错误报告6721 (也参见相关SO质疑)。
有是张贴此处一个小修正溶液>
然而,这只是将修正任何潜在的死锁在logging
。这不会解决,事情也许乱码了。见其他答案这里介绍。
有此是很大的包
包装: https://pypi.python.org/pypi/multiprocessing-logging/
代码: https://github.com/jruere/multiprocessing-logging
安装:
pip install multiprocessing-logging
然后加入:
import multiprocessing_logging
# This enables logs inside process
multiprocessing_logging.install_mp_handler()
要我的孩子谁满足了同样的问题几十年来,发现在这个网站这个问题,我离开这个答案。
简单VS过分复杂。只需使用其他工具。 Python是真棒,但它不是设计做一些事情。
对于下面的代码片段日志轮播守护程序的工作对我来说并没有过于复杂的事情。调度它每小时运行和
/var/log/mylogfile.log {
size 1
copytruncate
create
rotate 10
missingok
postrotate
timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"`
mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log
endscript
}
这是我如何安装它(符号链接不logrotate的工作):
sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname
sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate