How to deal with oversized Kafka documents in Logstash?

Issue description

 

Kafka does not accept documents because the documents are too large.

Increasing the limits does not help, because I have reached the level of 10MB and still some logstash events are still not sent to kafka.

After some time this results in the logstash queue being full, which in turn leads to the suspension of the entire pipeline ...

What is the best way to solve the above problem?

Logs
[2020-09-03T00:53:38,603][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.}

[2020-09-03T00:53:38,644][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1}

[2020-09-03T00:53:38,769][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1223210 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.}

[2020-09-03T00:53:38,770][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1}

[2020-09-03T00:53:38,878][INFO ][logstash.outputs.kafka ] Exhausted user-configured retry count when sending to K afka. Dropping these events. {:max_retries=>1, :drop_count=>1}

[2020-09-03T02:15:12,763][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.}

[2020-09-03T02:15:12,764][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1}

[2020-09-03T02:15:12,871][WARN ][logstash.outputs.kafka ] KafkaProducer.send() failed: org.apache.kafka.common.er rors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. {:exception=>java.util.concurrent.ExecutionExcep tion: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1216262 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.}

[2020-09-03T02:15:12,871][INFO ][logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay . {:batch_size=>1, :failures=>1, :sleep=>0.1}

  

Issue solution

 

Depending on the needs, we offer 3 solutions that should work:

1. Tagging of documents with more than 10,000 characters. in the message field.
Such document can be directed to, for example, the file with output file {} in the output section, and then preview and parse it accordingly, so that the message field is already cut into the appropriate fields. In this case, large documents will omit the output kafka and the logstash pipeline will not be full.

filter {
  ruby {
    code => "
      if event.get('message').length > 10000
      event.tag('TLTR')
      end
    "
  }
}

2. Truncate + Tagging.
The document will be truncated after the specified number of bytes and tagged so that it is known which message is truncated.
In this case, large documents will be truncated and correctly received on the kafka side, and the logstash pipeline will not be full.

filter {
  truncate {
    fields => ["message"]
    length_bytes => 49999999
    add_tag => "TLTR"
  }
}

3. Drop.
Useful when we know that "large documents" contain irrelevant information and we can afford to lose it. In this case, the document that will be bounced from the tile will be returned to the queue only for 1 try, and then it will be abandoned without clogging the logstash pipeline.
In the output section we must add:

retries=> 1