Pregunta

Puedo enviar Cadenas de mensajes a Kafka V.0.8 con el Java Productor de la API.Si el tamaño del mensaje es de unos 15 MB puedo obtener un MessageSizeTooLargeException.He tratado de establecer message.max.bytesa 40 MB, pero todavía se me pone la excepción.Pequeños mensajes funcionó sin problemas.

(La excepción aparece en el productor, no tengo un consumidor en esta solicitud).

¿Qué puedo hacer para deshacerse de esta excepción?

Mi ejemplo productor config

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Errores De Registro:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
¿Fue útil?

Solución

Necesita ajustar tres (o cuatro) propiedades:

  • Lado del consumidor:fetch.message.max.bytes - esto determinará el tamaño más grande de un mensaje que el consumidor puede recuperar.
  • Lado del corredor: replica.fetch.max.bytes - esto permitirá que las réplicas de los intermediarios envíen mensajes dentro del clúster y garantizará que los mensajes se repliquen correctamente.Si es demasiado pequeño, el mensaje nunca se replicará y, por lo tanto, el consumidor nunca verá el mensaje porque el mensaje nunca se confirmará (se replicará por completo).
  • Lado del corredor: message.max.bytes - este es el tamaño más grande del mensaje que puede recibir el corredor de un productor.
  • Lado del corredor (por tema): max.message.bytes - este es el tamaño más grande del mensaje que el corredor permitirá agregar al tema.Este tamaño está validado antes de la compresión.(Por defecto, el del corredor message.max.bytes.)

Descubrí por las malas el número 2: no recibe NINGUNA excepción, mensaje o advertencia de Kafka, así que asegúrese de considerar esto cuando envíe mensajes grandes.

Otros consejos

Se requieren cambios menores para Kafka 0.10 y el nuevo consumidor en comparación con La respuesta del hombre riendo:

  • Corredor:No hay cambios, aún necesitas aumentar las propiedades. message.max.bytes y replica.fetch.max.bytes. message.max.bytes tiene que ser igual o menor (*) que replica.fetch.max.bytes.
  • Productor:Aumentar max.request.size para enviar el mensaje más amplio.
  • Consumidor:Aumentar max.partition.fetch.bytes para recibir mensajes más grandes.

(*) Lea los comentarios para saber más sobre message.max.bytes<=replica.fetch.max.bytes

Debe anular las siguientes propiedades:

Configuraciones del corredor ($KAFKA_HOME/config/server.properties)

  • réplica.fetch.max.bytes
  • mensaje.max.bytes

Configuraciones del consumidor ($KAFKA_HOME/config/consumer.properties)
Este paso no funcionó para mí.Lo agregué a la aplicación para consumidores y funcionó bien.

  • recuperar.mensaje.max.bytes

Reinicie el servidor.

Mire esta documentación para obtener más información:http://kafka.apache.org/08/configuration.html

La idea es enviar un mensaje de igual tamaño desde Kafka Producer a Kafka Broker y luego recibirlo Kafka Consumer, es decir,

Productor de Kafka --> Broker de Kafka --> Consumidor de Kafka

Supongamos que si el requisito es enviar 15 MB de mensaje, entonces el Productor, el Corredor y el Consumidor, los tres, deben estar sincronizados.

Productor Kafka envía 15 MB --> Corredor Kafka Permite/almacena 15 MB --> Consumidor Kafka recibe 15 MB

Por lo tanto, la configuración debería ser:

a) sobre el corredor:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) sobre el Consumidor:

fetch.message.max.bytes=15728640

Una cosa clave a recordar que message.max.bytes atributo debe ser en sync con el acuerdo del consumidor fetch.message.max.bytes de la propiedad.el tamaño de la búsqueda debe ser al menos tan grande como el tamaño máximo de mensaje, de otra manera no podría ser la situación en la que los productores pueden enviar mensajes más grande que el consumidor puede consumir/fetch.Es posible que vale la pena echar un vistazo.
La versión de la obra de Kafka está utilizando?También se proporcionan algunos detalles más rastro que usted está consiguiendo.¿hay alguna cosa como ... payload size of xxxx larger than 1000000 la llegada en el registro?

La respuesta de @laughing_man es bastante precisa.Pero aún así quería dar una recomendación que aprendí del experto en Kafka. Stéphane Maarek de Quora.

Kafka no está destinado a manejar mensajes grandes.

Su API debe usar almacenamiento en la nube (por ejemplo, AWS S3) y simplemente enviar a Kafka o cualquier intermediario de mensajes una referencia de S3.Debe encontrar un lugar para conservar sus datos, tal vez sea una unidad de red, tal vez sea lo que sea, pero no debería ser un intermediario de mensajes.

Ahora, si no quieres seguir con la solución anterior

El tamaño máximo del mensaje es 1 MB (la configuración en sus corredores se llama message.max.bytes) Apache Kafka.Si realmente lo necesitara con urgencia, podría aumentar ese tamaño y asegurarse de aumentar los buffers de red para sus productores y consumidores.

Y si realmente le importa dividir su mensaje, asegúrese de que cada división de mensajes tenga exactamente la misma clave para que se envíe a la misma partición, y que el contenido de su mensaje debe informar una "identificación de parte" para que su consumidor pueda reconstruir completamente el mensaje. .

También puede explorar la compresión, si su mensaje está basado en texto (compresión gzip, snappy, lz4), lo que puede reducir el tamaño de los datos, pero no mágicamente.

Nuevamente, debe usar un sistema externo para almacenar esos datos y simplemente enviar una referencia externa a Kafka.Esa es una arquitectura muy común, una que usted debe seguir y que es ampliamente aceptada.

Tenga esto en cuenta que Kafka funciona mejor sólo si los mensajes son enormes en cantidad pero no en tamaño.

Fuente: https://www.quora.com/Cómo-envío-mensajes-grandes-80-MB-in-Kafka

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top