我向 Kafka V 发送字符串消息。0.8 带有 Java Producer API。如果消息大小约为 15 MB,我会得到 MessageSizeTooLargeException. 。我尝试过设置 message.max.bytes到 40 MB,但我仍然遇到异常。小消息没有问题。

(异常出现在生产者中,我在这个应用程序中没有消费者。)

我该怎么做才能摆脱这个异常?

我的示例生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
有帮助吗?

解决方案

需要调整三(或四个)属性:

  • 消费者侧:fetch.message.max.bytes - 这将确定消费者可以获取的消息的最大大小。
  • broker side:replica.fetch.max.bytes - 这将允许经纪商中的副本在群集中发送消息,并确保正确复制消息。如果这太小,那么消息将永远不会被复制,因此,消费者永远不会看到消息,因为消息永远不会被提交(完全复制)。
  • 经纪人侧:message.max.bytes - 这是可以由经纪人从生产者接收的消息的最大大小。
  • Broker Side(PER主题):max.message.bytes - 这是代理将允许附加到主题的最大消息的大小。此大小被验证预压缩。 (默认为Broker的message.max.bytes。)

我发现了大约2的艰难方式 - 你没有得到kafka的任何例外,消息或警告,所以当您发送大消息时务必考虑这一点。

其他提示

需要进行小的更改 卡夫卡0.10新消费者 相比 笑人的回答:

  • 经纪人:没有变化,还需要增加属性 message.max.bytesreplica.fetch.max.bytes. message.max.bytes 必须等于或小于 (*) replica.fetch.max.bytes.
  • 制作人:增加 max.request.size 发送更大的消息。
  • 消费者:增加 max.partition.fetch.bytes 接收更大的消息。

(*) 阅读评论以了解更多信息 message.max.bytes<=replica.fetch.max.bytes

您需要覆盖以下属性:

broker configs($ kafka_home / config / server.properties)

  • replica.fetch.max.Bytes
  • message.max.Bytes

fucminer configs($ kafka_home / configer.properties)
这一步对我没有工作。我将它添加到消费者应用程序,它正在工作正常

  • fetch.message.max.Bytes

重新启动服务器。

查看此文档以获取更多信息: http://kafka.apache.org/08/configuration.html

这个想法是将从Kafka Producer发送到Kafka Broker的相同大小的消息,然后由Kafka Consumer i.e收到。

kafka生产商 - > kafka broker - > kafka消费者

假设要求是要发送15MB的消息,然后生产者 Broker 以及消费者,所有三个,需要同步。

kafka生产商发送15 MB - > Kafka Broks 允许/存储15 MB - > <强大> kafka消费者收到15 MB

因此应该是:

Broker上的

a):

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640
.

b)在消费者上:

fetch.message.max.bytes=15728640
.

要记住的一件关键事情 message.max.bytes 属性必须是 同步中 与消费者的 fetch.message.max.bytes 财产。获取大小必须至少与最大消息大小一样大,否则可能会出现生产者发送的消息大于消费者可以消费/获取的消息的情况。也许值得一看。
您使用的是哪个版本的 Kafka?还提供您获得的更多详细信息跟踪。有没有类似... payload size of xxxx larger than 1000000 出现在日志中?

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top