Question

J'essaie de configurer un consommateur de base Java pour recevoir des messages d'un sujet de Kafka. J'ai suivi l'échantillon à - https://cwiki.apache.org.org / confluence / affichage / kafka / consommateur + groupe + exemple - et avoir ce code:

package org.example.kafka.client;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaClientMain 
{

    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;  


    public KafkaClientMain(String a_zookeeper, String a_groupId, String a_topic) 
    {
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;
    }    


    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");

        return new ConsumerConfig(props);
    }    

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }    


    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        System.out.println( "streams.size = " + streams.size() );

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }    


    public static void main(String[] args) 
    {


        String zooKeeper = "ec2-whatever.compute-1.amazonaws.com:2181";
        String groupId = "group1";
        String topic = "test";

        int threads = 1;

        KafkaClientMain example = new KafkaClientMain(zooKeeper, groupId, topic);

        example.run(threads);

    }

}

et

package org.example.kafka.client;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable 
{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) 
    {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() 
    {
        System.out.println( "calling ConsumerTest.run()" );
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

        while (it.hasNext())
        {    
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        }


        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

KAFKA est en cours d'exécution sur l'hôte EC2 en question et je peux envoyer et recevoir des messages sur le sujet "Test" à l'aide des outils Kafka-Console-ProDucer.sh et Kafka-Console-Consumer.sh. Le port 2181 est ouvert et disponible à partir de la machine où le consommateur est en cours d'exécution (et est donc 9092 pour une bonne mesure, mais cela ne semblait pas aider non plus).

Malheureusement, je ne reçois jamais de messages dans mon consommateur lorsque je l'exécute. Ni les messages existants sur le sujet, ni les messages nouvellement envoyés que j'envoie à l'aide de kafka-console-producer.sh, tandis que le consommateur est en cours d'exécution.

Ceci utilise Kafka 0.8.1.1 en cours d'exécution sur Centos 6.4 x64, à l'aide de OpenJDK 1.7.0_65.

EDIT: FWIW, lorsque le programme de consommation commence, je vois cette sortie de zookePer:

[2014-08-01 15:56:38,045] INFO Accepted socket connection from /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,049] INFO Client attempting to establish new session at /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,053] INFO Established session 0x1478e963fb30008 with negotiated timeout 6000 for client /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)

aucune idée de ce qui pourrait se passer avec cela? Toute aide est très appréciée.

Était-ce utile?

La solution

Répondre à cela moi-même pour la postérité, au cas où quelqu'un d'autre traverse un problème similaire.

La question était la suivante: le courtier Kafka et le zoookeper étaient sur un nœud EC2 et le consommateur était sur mon ordinateur portable fonctionnant localement.Lors de la connexion à Zookeper, le client a été remis une référence à «IP-10-0-X-X.EC2.Internal», qui ne résout pas (par défaut) de l'extérieur de EC2.Cela est devenu évident une fois que j'ai correctement configuré Log4J sur le client pour que je reçois tous les messages du journal.

La solution de contournement consistait à mettre une entrée dans le fichier mon / etc / hosts, cartographier le nom d'hôte interne EC2 à l'adresse IP routable publiquement.

Autres conseils

Vous pouvez résoudre ce problème à l'aide de paramètres suivants dans le fichier Server.Properties situé sous le dossier de configuration de Kafka

annoncé.host.name= DNS public du serveur EC2

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top