Friday, August 21, 2020

Throttling Kafka Consumption - Dispelling Myths

Kafka as a messaging broker was designed for high throughput publish and subscribe. At times this very fact can prove to be an issue, when being used in more traditionally designed transactional systems. For example if an application is consuming messages from a kafka topic and inserting, updating in a relational database, the high rate of consumption of kafka messages, can put pressure on transactional persistence in the RDBMS and cause exceptions. Hence there arises a need to throttle message consumption from a kafka topic.

Starting out, lets assume, that we are only considering the message consumption inside a single instance of a consumer (which may be a part of a bigger consumer group). So we have a single consumer instance, picking up kafka messages from a single partition of a topic.

Most kafka client frameworks, will implement a decent kafka consumer poll-loop, for reading kafka messages from the partition as soon as possible and then hand these messages over to client application, as fast as possible, but without waiting/blocking for the client application to process each message.

For single thread technologies like node js, the above can be an issue,in the sense that, the result is that the application starts processing large number of messages in parallel. Many will resort to kafka configuration parameters like below, to control the rate of consumption of messages from kafka, with very little success, for the reasons I mention below:

fetch.min.bytes - This property allows a consumer to specify the minimum amount of data that it wants to receive from the broker when fetching records. If a broker receives a request for records from a consumer but the new records amount to fewer bytes than fetch.min.bytes, the broker will wait until more messages are available before sending the records back to the consumer. 

fetch.max.wait.ms - By setting fetch.min.bytes, you tell Kafka to wait until it has enough data to send before responding to the consumer.If you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will receive a fetch request from the consumer and will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first.

max.partition.fetch.bytes - This property controls the maximum number of bytes the server will return per partition.

session.timeout.ms - The amount of time a consumer can be out of contact with the brokers while still considered alive defaults to 10 seconds.

max.poll.records - This controls the maximum number of records that a single call to poll() will return.

receive.buffer.bytes and send.buffer.bytes - These are the sizes of the TCP send and receive buffers used by the sockets when writing and reading data.

If we manipulate above values, messages will be fetched more frequently OR less frequently. But if  already 1000s of messages are lying in the topic, it wont matter a lot, given that fetch from topic to consumer is very quick(micro seconds). The consumer will still feel a steady supply of almost parallel messages.

Even playing around with commit offsets, will never block consumption of messages but only influence recovery, guaranteed delivery, once only delivery, etc. This will not be able to restrict super-fast, reading of messages by the consumer. (Its not like late acknowledgements, block, consumption of messages in traditional message brokers like JMS, MQ, etc)

The bottom line is that, the challenge of parallel consumption of messages in a kafka instance consumer, needs to be handled by the process/application and not by kafka configurations.

Consumer Side Technology

A much more important factor while dealing with throttling of kafka messages, is which tech/architecture you use, on the consumer side application.

Using Java / SpringBoot based consumer, will ensure that the consumer process runtime will be multi-thread-based and will be synchronous first or synchronous by default. The use of thread-pools in Java/Spring and given the fact that almost all calls in Java are synchronous by default, it is much easier to control the kafka message consumption in Java

Node JS based consumers are inherently single threaded an hence asynchronous first or asynchronous by default, using await, promises and callbacks in a way, that near-parallel consumption of kafka messages will be challenge, especially for less-experienced developers.