Question

The following code just blocks the execution. It seems that map could not return the lazy-sequence for some reason. I am not sure why.

(ns testt.consumer
  (:require
    ;internal
    ;external
    [clojure.walk   :refer [stringify-keys]]
    [clojure.pprint :as pprint])
  (:import
    [kafka.consumer         ConsumerConfig Consumer KafkaStream ]
    [kafka.javaapi.consumer ConsumerConnector                   ]
    [kafka.message          MessageAndMetadata                  ]
    [java.util              Properties                          ])
  (:gen-class))

; internal

(defn hashmap-to-properties
  [h]
  (doto (Properties.)
    (.putAll (stringify-keys h))))

; external

(defn consumer-connector
  [h]
  (let [config (ConsumerConfig. (hashmap-to-properties h))]
    (Consumer/createJavaConsumerConnector config)))

(defn message-stream
  [^ConsumerConnector consumer topic thread-pool-size]
  ;this is dealing only with the first stream, needs to be fixed to support multiple streams
  (let [ stream (first (.get (.createMessageStreams consumer {topic thread-pool-size}) topic)) ]
    (map #(.message %) (iterate (.next (.iterator ^KafkaStream stream))))))

The configuration the connector expects:

{  :zookeeper.connect              "10.0.0.1:2181"
   :group.id                       "test-0"
   :thread.pool.size               "1"
   :topic                          "test_topic"
   :zookeeper.session.timeout.ms   "1000"
   :zookeeper.sync.time.ms         "200"
   :auto.commit.interval.ms        "1000"
   :auto.offset.reset              "smallest"
   :auto.commit.enable             "true" }
Was it helpful?

Solution

Actually there is no need for Kafka's iterate because Clojure provides a better way of dealing with streams. You can think about Kafka streams as a sequence and just do the following:

(doseq 
 [^kafka.message.MessageAndMetadata message stream] (do-some-stuff message))

This is probably the most efficient way of doing that.

More of the code is here:

https://github.com/l1x/shovel

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top