Context manager for logging equivalent to contextmanager=>yield stdout

2k views Asked by At

A library I'm using takes a function like this:

@contextlib.contextmanager
def stdout_cm():
    yield sys.stdout

Now rather than foo(stdout_cm), how do I send through a logging instance that tees to multiple handlers? (stdout and StringIO)

Attempt:

from sys import stdout, stderr
from cStringIO import StringIO
from logging import getLogger, basicConfig, StreamHandler
from subprocess import check_call

basicConfig(format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
            datefmt='%m-%d %H:%M', level='INFO')

log = getLogger(__name__)
console = StreamHandler(StringIO())

log.addHandler(console)
log.addHandler(StreamHandler(stdout))

class LoggerContext(object):
    def __init__(self, logger, level=None, close=True):
        self.logger = logger
        self.level = level
        self.close = close

    def __call__(self):
        return self

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)

    def __exit__(self, et, ev, tb):
        if not self.close:
            return
        for handler in self.logger.handlers:
            handler.close()

Simplified function:

def foo(f):
    with f() as fh:
        check_call(["printf '\n\n' | wc -l"], shell=True, stderr=fh, stdout=fh)

foo(LoggerContext(log)) #, level='WARN')


print console.stream.getvalue()
1

There are 1 answers

0
A T On BEST ANSWER

Prelude:

import os
import logging
import subprocess

from io import IOBase
from sys import stdout
from select import select
from threading import Thread
from time import sleep

log = logging.getLogger(__name__)

Class (based on https://stackoverflow.com/a/4838875):

class StreamLogger(IOBase):
    _run = None

    def __init__(self, logger_obj, level):
        super(StreamLogger, self).__init__()
        self.logger_obj = logger_obj
        self.level = level
        self.pipe = os.pipe()
        self.thread = Thread(target=self._flusher)
        self.thread.start()

    def __call__(self): return self

    def _flusher(self):
        self._run = True
        buf = b''
        while self._run:
            for fh in select([self.pipe[0]], [], [], 1)[0]:
                buf += os.read(fh, 1024)
                while b'\n' in buf:
                    data, buf = buf.split(b'\n', 1)
                    self.write(data.decode())
            sleep(1)
        self._run = None

    def write(self, data): return self.logger_obj.log(self.level, data)
    def fileno(self): return self.pipe[1]

    def close(self):
        if self._run:
            self._run = False
            while self._run is not None:
                sleep(1)
            os.close(self.pipe[0])
            os.close(self.pipe[1])

Usage:

with StreamLogger(log, logging.INFO) as out, StreamLogger(log,
                                                          logging.ERROR) as err:
    subprocess.Popen('ls 1>&2', stderr=err, shell=True)