Co zrobić ze zbyt dużymi dokumentami Kafki w Logstashu?

Opis problemu

 

Kafka nie przyjmuje dokumentów ponieważ są one za duże.

Zwiększanie limitów nic nie daje, bo osiągnięto poziom 10MB i dalej niektórych zdarzeń logstash nie jest w stanie ich wysłać do kafki.

Po czasie skutkuje to zapełnieniem kolejki w logstashu, co w konsekwencji prowadzi do zawieszenia całego pipeline...

Jak najlepiej rozwiązać taki problem?

Logi
[2020-09-03T00:53:38,603][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.} [2020-09-03T00:53:38,644][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1} [2020-09-03T00:53:38,769][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.} [2020-09-03T00:53:38,770][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1} [2020-09-03T00:53:38,878][INFO ][logstash.outputs.kafka ] Exhausted user-configured retry count when sending to K afka. Dropping these events. {:max_retries=>1, :drop_count=>1} [2020-09-03T02:15:12,763][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.} [2020-09-03T02:15:12,764][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1} [2020-09-03T02:15:12,871][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.} [2020-09-03T02:15:12,871][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1}

&nbsp&nbsp

Issue solution

 

W zależności od potrzeb proponujemy 3 rozwiązania które powinny się sprawdzić:

1. Tagowanie dokumentów posiadających więcej niż 10000 znaków w polu message.
Taki dokument można w sekcji output skierować np pliku - output file{}, a następnie podejrzeć i odpowiednio sparsować tak żeby pole message było już pocięte na odpowiednie pola. W tym przypadku duże dokumenty będą pomijały output kafki i pipeline logstasha się nie zapełni.


filter&nbsp{
&nbsp&nbspruby&nbsp{
&nbsp&nbsp&nbsp&nbspcode&nbsp=>&nbsp"
&nbsp&nbsp&nbsp&nbsp&nbsp&nbspif&nbspevent.get('message').length&nbsp>&nbsp10000
&nbsp&nbsp&nbsp&nbsp&nbsp&nbspevent.tag('TLTR')
&nbsp&nbsp&nbsp&nbsp&nbsp&nbspend
&nbsp&nbsp&nbsp&nbsp"
&nbsp&nbsp}
}

2. Truncate + Tagowanie.
Dokument zostanie obcięty po określonej liczbie bajtów oraz otagowany tak żeby było wiadomo który message jest ucięty.
W tym przypadku duże dokumenty będą ucinane i zostaną poprawnie odebrane po stronie kafki, a pipeline logstasha się nie zapełni

filter&nbsp{
&nbsp&nbsptruncate&nbsp{
&nbsp&nbsp&nbsp&nbspfields&nbsp=>&nbsp["message"]
&nbsp&nbsp&nbsp&nbsplength_bytes&nbsp=>&nbsp49999999
&nbsp&nbsp&nbsp&nbspadd_tag&nbsp=>&nbsp"TLTR"
&nbsp&nbsp}
}

3. Drop.
Rozwiązanie przydatne wtedy, gdy wiemy że "duże dokumenty" zawierają nieistotne informacje i możemy pozwolić sobie na ich utratę.
W tym wypadku dokument który zostanie odbity od kafki, wróci do kolejki tylko jede raz, a następnie zostanie porzucony. Pipeline logstasha przez to nie będzie utrzymywał tego dokumentu.
W sekcji output musimy dodać:

retries=> 1