Question

I've got a large complex application that is heavily using the Python logging module.

I need to start getting these logs into a Kafka cluster, and need to ensure that I don't change the data along the way.

To me the ideal solution is to just create a new handler for Kafka - and allow the logs to go to both the old logging solution and kafka in parallel for a while. Then eventually shut off the old logging handlers and just send to Kafka.

However, I don't see any kafka logging handlers - only kafka clients. Adding a kafka client would mean tracking down every current logging call and adding a separate call to the new kafka client. Getting identical results will be difficult.

Was it helpful?

Solution

Handler implementation is really simple. Actually, setting up the environment took more time than implementing the handler.

The handler constructor accepts optional argument key. If it's provided, the messages written will be sent to a single partition specified by this key. If not provided, the messages will be distributed between servers on round robin basis.

I haven't tested it much, but it's so simple that I don't see what might go wrong here. Hope it will be useful.

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer,KeyedProducer
import logging,sys

class KafkaLoggingHandler(logging.Handler):

    def __init__(self, host, port, topic, key=None):
        logging.Handler.__init__(self)
        self.kafka_client = KafkaClient(host, port)
        self.key = key
        if key is None:
            self.producer = SimpleProducer(self.kafka_client, topic)
        else:
            self.producer = KeyedProducer(self.kafka_client, topic)

    def emit(self, record):
        #drop kafka logging to avoid infinite recursion
        if record.name == 'kafka':
            return
        try:
            #use default formatting
            msg = self.format(record)
            #produce message
            if self.key is None:
                self.producer.send_messages(msg)
            else:
                self.producer.send(self.key, msg)
        except:
            import traceback
            ei = sys.exc_info()
            traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
            del ei

    def close(self):
        self.producer.stop()
        logging.Handler.close(self)

kh = KafkaLoggingHandler("localhost", 9092, "test_log")
#OR
#kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")

logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
logger.addHandler(kh)
logger.info("The %s boxing wizards jump %s", 5, "quickly")
logger.debug("The quick brown %s jumps over the lazy %s", "fox",  "dog")
try:
    import math
    math.exp(1000)
except:
    logger.exception("Problem with %s", "math.exp")

P.S. The handler uses this Kafka client: https://github.com/mumrah/kafka-python

OTHER TIPS

This is an awesome fix, thank you! The code has been updated somewhat in the past few years and some functions are now deprecated. The overall design of this fix was very, very helpful though so thank you once again.

SimpleProducer (deprecated) --> KafkaProducer
SimpleConsumer (deprecated) --> KafkaConsumer

Here is my revised snippet using Kafka 1.0.0 and kafka-python 1.4.2 and just the producer because I'm consuming via logstash on the other end.

Hope this works for you!

tester.py (main routine)

# -*- coding: utf-8 -*-
"""Module to test out logging to kafka."""

import json
import logging

from utils.kafka_handler import KafkaHandler
from kafka import KafkaProducer


def run_it(logger=None):
    """Run the actual connections."""

    logger = logging.getLogger(__name__)
    # enable the debug logger if you want to see ALL of the lines
    #logging.basicConfig(level=logging.DEBUG)
    logger.setLevel(logging.DEBUG)

    kh = KafkaHandler(['localhost:9092'], 'sebtest')
    logger.addHandler(kh)

    logger.info("I'm a little logger, short and stout")
    logger.debug("Don't tase me bro!")


if __name__ == "__main__":
    run_it()

utils/kafka_handler.py (utility for logging)

# -*- coding: utf-8 -*-
"""Module to provide kafka handlers for internal logging facility."""

import json
import logging
import sys

from kafka import KafkaProducer


class KafkaHandler(logging.Handler):
    """Class to instantiate the kafka logging facility."""

    def __init__(self, hostlist, topic='corp_it_testing', tls=None):
        """Initialize an instance of the kafka handler."""
        logging.Handler.__init__(self)
        self.producer = KafkaProducer(bootstrap_servers=hostlist,
                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                      linger_ms=10)
        self.topic = topic

    def emit(self, record):
        """Emit the provided record to the kafka_client producer."""
        # drop kafka logging to avoid infinite recursion
        if 'kafka.' in record.name:
            return

        try:
            # apply the logger formatter
            msg = self.format(record)
            self.producer.send(self.topic, {'message': msg})
            self.flush(timeout=1.0)
        except Exception:
            logging.Handler.handleError(self, record)

    def flush(self, timeout=None):
        """Flush the objects."""
        self.producer.flush(timeout=timeout)

    def close(self):
        """Close the producer and clean up."""
        self.acquire()
        try:
            if self.producer:
                self.producer.close()

            logging.Handler.close(self)
        finally:
            self.release()
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top