如何使用 Kafka 发送大消息(超过 15MB)?
-
21-12-2019 - |
题
我向 Kafka V 发送字符串消息。0.8 带有 Java Producer API。如果消息大小约为 15 MB,我会得到 MessageSizeTooLargeException
. 。我尝试过设置 message.max.bytes
到 40 MB,但我仍然遇到异常。小消息没有问题。
(异常出现在生产者中,我在这个应用程序中没有消费者。)
我该怎么做才能摆脱这个异常?
我的示例生产者配置
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
错误日志:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
解决方案
需要调整三(或四个)属性:
- 消费者侧:
fetch.message.max.bytes
- 这将确定消费者可以获取的消息的最大大小。 - broker side:
replica.fetch.max.bytes
- 这将允许经纪商中的副本在群集中发送消息,并确保正确复制消息。如果这太小,那么消息将永远不会被复制,因此,消费者永远不会看到消息,因为消息永远不会被提交(完全复制)。 - 经纪人侧:
message.max.bytes
- 这是可以由经纪人从生产者接收的消息的最大大小。 - Broker Side(PER主题):
max.message.bytes
- 这是代理将允许附加到主题的最大消息的大小。此大小被验证预压缩。 (默认为Broker的message.max.bytes
。)
我发现了大约2的艰难方式 - 你没有得到kafka的任何例外,消息或警告,所以当您发送大消息时务必考虑这一点。
其他提示
您需要覆盖以下属性:
broker configs($ kafka_home / config / server.properties)
- replica.fetch.max.Bytes
- message.max.Bytes
fucminer configs($ kafka_home / configer.properties)
这一步对我没有工作。我将它添加到消费者应用程序,它正在工作正常
- fetch.message.max.Bytes
重新启动服务器。
查看此文档以获取更多信息: http://kafka.apache.org/08/configuration.html
这个想法是将从Kafka Producer发送到Kafka Broker的相同大小的消息,然后由Kafka Consumer i.e收到。
kafka生产商 - > kafka broker - > kafka消费者
假设要求是要发送15MB的消息,然后生产者, Broker 以及消费者,所有三个,需要同步。
kafka生产商发送15 MB - > Kafka Broks 允许/存储15 MB - > <强大> kafka消费者收到15 MB
因此应该是:
Broker上的a):
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
.
b)在消费者上:
fetch.message.max.bytes=15728640
. 要记住的一件关键事情 message.max.bytes
属性必须是 同步中 与消费者的 fetch.message.max.bytes
财产。获取大小必须至少与最大消息大小一样大,否则可能会出现生产者发送的消息大于消费者可以消费/获取的消息的情况。也许值得一看。
您使用的是哪个版本的 Kafka?还提供您获得的更多详细信息跟踪。有没有类似... payload size of xxxx larger
than 1000000
出现在日志中?
@laughing_man的答案非常准确。但我仍然是从Quora从Kafka Expert Stephane Maarek
kafka并不意味着处理大消息。
您的API应该使用云存储(ex aws s3),并推到Kafka或任何消息代理A参考S3。您必须在某处找到保留您的数据,也许是网络驱动器,也许是无论如何,但它不应该是消息代理。
现在,如果您不想使用上述解决方案
消息最大大小为1MB(经纪人中的设置名为message.max.bytes
) apache kafka 。如果您真的需要它,您可以增加这一规模并确保为生产者和消费者增加网络缓冲区。
如果您真正关心拆分邮件,请确保每个邮件拆分具有完全相同的键,以便它被推到相同的分区,并且您的消息内容应该报告“部分ID”,以便您的消费者可以完全报告“部分ID”。重建消息。
您还可以探索压缩,如果您的消息是基于文本的(GZIP,Snappy,LZ4压缩),这可能会降低数据大小,但不是神奇地。
再次,您必须使用外部系统来存储该数据并仅推送对Kafka的外部引用。这是一个非常常见的架构,你应该和广泛接受的架构。
来源: https:/ / www.quora.com/how-do-i-send-large-messages-80-mb-in-kafka