Domanda

Sto cercando di impostare un consumatore di base Java per ricevere messaggi da un argomento Kafka. Ho seguito il campione in - https://cwiki.apache.org / Confluence / Display / Kafka / Consumer + Group + Esempio - E avere questo codice:

package org.example.kafka.client;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaClientMain 
{

    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;  


    public KafkaClientMain(String a_zookeeper, String a_groupId, String a_topic) 
    {
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;
    }    


    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");

        return new ConsumerConfig(props);
    }    

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }    


    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        System.out.println( "streams.size = " + streams.size() );

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }    


    public static void main(String[] args) 
    {


        String zooKeeper = "ec2-whatever.compute-1.amazonaws.com:2181";
        String groupId = "group1";
        String topic = "test";

        int threads = 1;

        KafkaClientMain example = new KafkaClientMain(zooKeeper, groupId, topic);

        example.run(threads);

    }

}
.

e

package org.example.kafka.client;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable 
{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) 
    {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() 
    {
        System.out.println( "calling ConsumerTest.run()" );
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

        while (it.hasNext())
        {    
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        }


        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}
.

Kafka è in esecuzione sull'host EC2 in questione, e posso inviare e ricevere messaggi sull'argomento "Test" utilizzando Kafka-Console-producer.sh e Kafka-Console-Consumer.sh Tools. La porta 2181 è aperta e disponibile dalla macchina in cui il consumatore è in esecuzione (e così è 9092 per buona misura, ma questo non sembrava aiutare neanche).

Sfortunatamente, non ricevo mai messaggi nel mio consumatore quando corro questo. Né messaggi esistenti sull'argomento, né inviato nuovi messaggi che invii usando kafka-console-producer.sh, mentre il consumatore è in esecuzione.

Questo utilizza Kafka 0.8.1.1 in esecuzione su Centos 6.4 x64, utilizzando OpenJDK 1.7.0_65.

Modifica: FWIW, quando inizia il programma di consumo, vedo questa uscita Zookeeper:

[2014-08-01 15:56:38,045] INFO Accepted socket connection from /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,049] INFO Client attempting to establish new session at /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,053] INFO Established session 0x1478e963fb30008 with negotiated timeout 6000 for client /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
.

Qualche idea che potrebbe andare avanti con questo? Qualsiasi e tutto l'aiuto è molto apprezzato.

È stato utile?

Soluzione

Rispondendo a me stesso per i posteri, nel caso qualcun altro attraversa un problema simile.

Il problema è stato questo: il Kafka Broker e Zookeeper erano in un nodo EC2, e il consumatore era sul mio portatile in esecuzione localmente.Quando si collega a Zookeeper, il client è stato consegnato un riferimento a "IP-10-0-X-X.EC2.internal", che non risolve (per impostazione predefinita) dall'esterno della EC2.Ciò è diventato chiaro una volta configurato correttamente il log4j sul client in modo da ottenere tutti i messaggi di registro.

La soluzione alternativa è stata quella di inserire una voce nel file My / etc / hosts, mappare il nome host interno EC2 sull'indirizzo IP insinuabile pubblicamente.

Altri suggerimenti

È possibile risolvere questo problema utilizzando l'impostazione seguente Proprietà in Server.Properties File situato sotto la cartella Config Kafka

pubblicizzato.host.name= DNS pubblico di server EC2

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top