Pregunta

soy muy nuevo en Kafka y estamos usando Kafka 0.8.1.

Lo que tengo que hacer es consumir un mensaje del tema.Para eso, tendré que escribir un consumidor en Java que consumirá un mensaje del tema y luego lo guardará en la base de datos.Después de guardar un mensaje, se enviará una confirmación al consumidor de Java.Si el reconocimiento es verdadero, el siguiente mensaje debe consumirse del tema.Si el reconocimiento es falso (lo que significa que debido a algún mensaje de error, leído del tema, no se pudo guardar en la base de datos), entonces nuevamente se debe leer ese mensaje.

creo que necesito usar Simple Consumer, para tener control sobre la compensación de mensajes y haber analizado el ejemplo de Consumidor simple como se muestra en este enlace https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Ejemplo.

En este ejemplo, el desplazamiento se evalúa en el método de ejecución como 'readOffset'.¿Necesito jugar con eso?Por ej.Puedo usar LatestTime() en lugar de EarliestTime() y en caso de falso, restableceré el desplazamiento al que tenía antes de usar offset - 1.

¿Es así como debo proceder?

¿Fue útil?

Solución

Creo que puedes llevarte bien usando el consumidor de alto nivel (http://kafka.apache.org/documentation.html#highlevelconsumerapi), debería ser más fácil de usar que SimpleConsumer.No creo que el consumidor necesite volver a leer los mensajes de Kafka sobre fallas en la base de datos, ya que el consumidor ya tiene esos mensajes y puede reenviarlos a la base de datos o hacer cualquier otra cosa que considere conveniente.

Los consumidores de alto nivel almacenan el último desplazamiento leído de una partición específica en Zookeeper (según el nombre del grupo de consumidores), de modo que cuando un proceso de consumidor finaliza y luego se reinicia (potencialmente en otro host), pueda continuar procesando mensajes donde Parado.Es posible guardar automáticamente este desplazamiento en Zookeeper periódicamente (consulte las propiedades del consumidor auto.commit.enable y auto.commit.interval.ms), o hacer que la lógica de la aplicación lo guarde llamando ConsumerConnector.commitOffsets .Ver también https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example .

Le sugiero que desactive la confirmación automática y confirme sus compensaciones una vez que haya recibido el reconocimiento de la base de datos.Por lo tanto, puede asegurarse de que los mensajes no procesados ​​se vuelvan a leer desde Kafka en caso de falla del consumidor y que todos los mensajes enviados a Kafka eventualmente lleguen a la base de datos al menos una vez (pero no "exactamente una vez").

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top