Pregunta

Estoy tratando de configurar un consumidor de Java básico para recibir mensajes de un tema de Kafka. He seguido la muestra en - https://cwiki.apache.org / confluencia / pantalla / kafka / consumidor + grupo + ejemplo - y tenga este código:

package org.example.kafka.client;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaClientMain 
{

    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;  


    public KafkaClientMain(String a_zookeeper, String a_groupId, String a_topic) 
    {
        this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;
    }    


    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "1000");
        props.put("zookeeper.sync.time.ms", "1000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");

        return new ConsumerConfig(props);
    }    

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }    


    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        System.out.println( "streams.size = " + streams.size() );

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }    


    public static void main(String[] args) 
    {


        String zooKeeper = "ec2-whatever.compute-1.amazonaws.com:2181";
        String groupId = "group1";
        String topic = "test";

        int threads = 1;

        KafkaClientMain example = new KafkaClientMain(zooKeeper, groupId, topic);

        example.run(threads);

    }

}

y

package org.example.kafka.client;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable 
{

    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) 
    {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() 
    {
        System.out.println( "calling ConsumerTest.run()" );
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

        while (it.hasNext())
        {    
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        }


        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

Kafka se está ejecutando en el anfitrión de EC2 en cuestión, y puedo enviar y recibir mensajes en el tema "Prueba" utilizando las herramientas kafka-console-producer.sh y kafka-console-consumer.sh. El puerto 2181 está abierto y disponible en la máquina donde el consumidor se está ejecutando (y así es 9092 por una buena medida, pero eso no parecía ayudar tampoco).

Desafortunadamente, nunca recibo ningún mensaje en mi consumidor cuando ejecuto esto. Ni los mensajes existentes en el tema, ni los mensajes recién enviados que envíen usando kafka-console-producer.sh, mientras que el consumidor se está ejecutando.

Esto está usando Kafka 0.8.1.1 que se ejecuta en CENTOS 6.4 X64, utilizando OpenJDK 1.7.0_65.

Editar: FWIW, cuando se inicia el programa de consumo, veo esta salida de ZOOKEEPER:

[2014-08-01 15:56:38,045] INFO Accepted socket connection from /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,049] INFO Client attempting to establish new session at /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-08-01 15:56:38,053] INFO Established session 0x1478e963fb30008 with negotiated timeout 6000 for client /98.101.159.194:24218 (org.apache.zookeeper.server.NIOServerCnxn)

¿Alguna idea de qué podría estar sucediendo con esto? Cualquier ayuda es muy apreciada.

¿Fue útil?

Solución

Respondiendo a esto para la posteridad, en caso de que alguien más se extienda a través de un problema similar.

El problema fue este: el agente de Kafka y el ZOOKEEPER estaban en un nodo EC2, y el consumidor estaba en mi computadora portátil corriendo localmente.Cuando se conecta a ZOOKEEPER, el cliente se le dio una referencia a "IP-10-0-X-X.EC2.Internal", que no se resuelve (por defecto) desde fuera de EC2.Esto quedó claro una vez que configuré correctamente log4j en el cliente, por lo que estaba obteniendo todos los mensajes de registro.

La solución alternativa fue simplemente poner una entrada en el archivo / etc / hosts, asignando el nombre de host interno EC2 a la dirección IP enrutable públicamente.

Otros consejos

Puede resolver este problema utilizando la configuración siguiente en la propiedad en el archivo servidor.properties que se encuentra en la carpeta de configuración de Kafka

anunciado.host.name= DNS público del servidor EC2

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top