Kafka를 사용하여 대용량 메시지(15MB 이상)를 보내려면 어떻게 해야 하나요?
-
21-12-2019 - |
문제
Kafka V에 문자열 메시지를 보냅니다.0.8(Java 생산자 API 포함)메시지 크기가 약 15MB이면 MessageSizeTooLargeException
.설정해 보았습니다 message.max.bytes
최대 40MB까지 가능하지만 여전히 예외가 발생합니다.작은 메시지는 문제 없이 작동했습니다.
(예외는 생산자에 나타납니다. 이 애플리케이션에는 소비자가 없습니다.)
이 예외를 없애려면 어떻게 해야 합니까?
내 예제 생산자 구성
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
오류 기록:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
해결책
3개(또는 4개) 속성을 조정해야 합니다.
- 소비자 측:
fetch.message.max.bytes
- 소비자가 가져올 수 있는 메시지의 최대 크기를 결정합니다. - 브로커 측:
replica.fetch.max.bytes
- 이렇게 하면 브로커의 복제본이 클러스터 내에서 메시지를 보내고 메시지가 올바르게 복제되는지 확인할 수 있습니다.이 값이 너무 작으면 메시지가 복제되지 않으므로 메시지가 커밋(완전히 복제)되지 않으므로 소비자는 메시지를 볼 수 없습니다. - 브로커 측:
message.max.bytes
- 브로커가 생산자로부터 수신할 수 있는 메시지의 최대 크기입니다. - 브로커 측(주제별):
max.message.bytes
- 브로커가 주제에 추가하도록 허용하는 메시지의 최대 크기입니다.이 크기는 압축 전 검증되었습니다.(기본값은 브로커의message.max.bytes
.)
나는 2번에 대해 어려운 방법을 찾았습니다. Kafka에서는 예외, 메시지 또는 경고를 받지 않으므로 대용량 메시지를 보낼 때 이 점을 반드시 고려하십시오.
다른 팁
사소한 변경이 필요함 카프카 0.10 그리고 새로운 소비자 비교하다 Laughing_man의 답변:
- 브로커:변경 사항이 없습니다. 여전히 속성을 늘려야 합니다.
message.max.bytes
그리고replica.fetch.max.bytes
.message.max.bytes
다음과 같거나 작아야 합니다(*).replica.fetch.max.bytes
. - 생산자:증가하다
max.request.size
더 큰 메시지를 보내려면 - 소비자:증가하다
max.partition.fetch.bytes
더 큰 메시지를 받으려면.
(*) 자세한 내용은 댓글을 읽어보세요. message.max.bytes
<=replica.fetch.max.bytes
다음 속성을 무시해야합니다.
브로커 구성 ($ kafka_home / config / server.properties)
- replica.fetch.max.bytes
- message.max.bytes
소비자 구성 ($ kafka_home / config / consumer.properties)
이 단계는 나를 위해 작동하지 않았습니다.나는 그것을 소비자 앱에 추가하고
- fetch.message.max.bytes
서버를 다시 시작하십시오.
자세한 정보는이 설명서를 살펴보십시오. http://kafka.apache.org/08/configuration.html
KAFKA 생산자로부터 KAFKA 브로커로 보내고 KAFKA 소비자 I.E가받은 메시지의 동등한 크기를 갖는 것입니다.
Kafka Producer -> Kafka Broker -> Kafka Consumer
요구 사항이 15MB의 메시지를 보내는 경우, 생산자 , 브로커 및 소비자 을 전송하는 것입니다.동기화되어 있어야합니다.
Kafka Producer 15MB -> -> 을 <강력한> Kafka Consumer 은 15MB를받습니다
설정은 다음과 같아야합니다 :
a) 브로커에서 :
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
.
b) 소비자 :
fetch.message.max.bytes=15728640
. message.max.bytes
속성이 소비자의 fetch.message.max.bytes
속성을 사용하여 Sync 에서 이어야한다는 것을 기억해야합니다.가져 오기 크기는 최소한 최대 메시지 크기만큼 크지 않아야합니다. 그렇지 않으면 생산자가 소비자가 소비자가 소비 / 가져올 수있는 메시지를 보낼 수있는 상황이있을 수 있습니다.그것은 그것을 봐주는 가치가있을 것입니다.
어떤 버전의 Kafka를 사용하고 있습니까?또한 당신이 얻는 더 자세한 내용을 제공하십시오.거기에 있습니다 ... 로그에 payload size of xxxx larger
than 1000000
가 나오는가?
@laughing_man의 답변은 매우 정확합니다. 그러나 나는 Quora에서 Kafka 전문가 Stephane Maarek 에서 배운 추천을주고 싶었습니다.
kafka는 큰 메시지를 처리하기위한 것이 아닙니다.
API는 클라우드 스토리지 (EX AWS S3)를 사용해야하며 KAFKA 또는 S3의 참조를 참조하십시오. 당신은 당신의 데이터를 지속할지, 어쩌면 그것은 네트워크 드라이브 일 것입니다, 어쩌면 그것은 무엇이든지, 메시지 브로커가 아니어야합니다.
이제 위의 해결책을 사용하지 않으려면
메시지 최대 크기는 1MB입니다 (브로커의 설정은 message.max.bytes
라고합니다) Apache Kafka 당신이 정말로 그것이 나쁘게 필요하다면, 그 크기를 늘리고 생산자와 소비자의 네트워크 버퍼를 늘릴 수 있습니다.
메시지가 텍스트 기반 (GZIP, SNAPPT, LZ4 압축)이지만 마술 적으로는 아니지만 마술 적이 아닌 경우 압축을 탐색 할 수 있습니다.
다시 해당 데이터를 저장하고 외부 참조를 KAFKA에 푸시하려면 외부 시스템을 사용해야합니다. 그것은 매우 일반적인 아키텍처이며, 당신이 가서 널리 받아 들여야합니다.
kafka는 메시지가 거대하지만 크기가 아니라 크기가 거의없는 경우에만 KAFKA가 작동합니다.
출처 : https : / /www.quora.com/how-do-i-send-large-messages-80-mb-in-kafka