Spring Kafka – Batch Listener Example
In the following tutorial we demonstrate how to setup a batch listener using Spring Kafka,
Spring Boot and Maven. We start by configuring the BatchListener
. You can optionally configure a BatchErrorHandler
. We also demonstrate how to set the upper limit of batch size messages. When we receive messages we also have the possibility of grabbing header values for individual messages.
Project Setup
Spring Kafka
: 2.1.4.RELEASESpring Boot
: 2.0.0.RELEASEApache Kafka
: kafka_2.11-1.0.0Maven
: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.memorynotfound.spring.kafka</groupId>
<artifactId>batch-listener</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>http://memorynotfound.com</url>
<description>Spring Kafka - Batch Listener Example</description>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Sending Messages to Kafka
In a previous tutorial we saw how to produce and consume messages using Spring Kafka.
The Sender
and SenderConfig
are identical. In the following example we show how to batch receive messages using a BatchListener
.
Configuring a Batch Listener
Starting with version 1.1, @KafkaListener methods can be configured to receive the entire batch of consumer records received from the consumer poll. To configure the listener container factory to create batch listeners, set the batchListener property of the ConcurrentKafkaListenerContainerFactory
to true
.
We can optionally create a BatchErrorHandler
by using the ConcurrentKafkaListenerContainerFactory#getContainerProperties().setBatchErrorHandler()
and providing your Batch Error Handler.
We can configure Spring Kafka to set an upper limit for the batch size by setting the ConsumerConfig.MAX_POLL_RECORDS_CONFIG
to a value that suits you. By default, the number of records received in each batch is dynamically calculated. In the following example we configured the upper limit to 5
.
package com.memorynotfound.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class ListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
return factory;
}
}
Batch Receive Kafka Messages using a Batch Listener
Since we are receiving batch messages we need to update the receive()
method to accept a List
of messages. Alternatively you can receive a List of Message<?>
or ConsumerRecord<?,?&glt;
objects with each offset, etc in each message, but it must be the only parameter (aside from an optional Acknowledgment
when using manual commits) defined on the method.
While receiving batch messages it’s also possible to receive the complementary headers of individual messages. You’ll also need to accept a List
of headers you want to get.
package com.memorynotfound.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class Listener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "batch-listener", topics = "${app.topic.batch}")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
LOG.info("beginning to consume batch messages");
for (int i = 0; i < messages.size(); i++) {
LOG.info("received message='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
LOG.info("all batch messages consumed");
}
}
Configure Application with application.yml
We also create a application.yml
properties file which is located in the src/main/resources
folder. These properties are injected in the configuration classes by spring boot.
spring:
kafka:
bootstrap-servers: localhost:9092
app:
topic:
foo: foo.t
logging:
level:
root: ERROR
org.springframework.web: ERROR
com.memorynotfound: DEBUG
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost
on port 9092
, which is the default configuration of Kafka.
package com.memorynotfound.kafka;
import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
for (int i = 1; i < 13; i++){
sender.send("message-" + i);
}
}
}
Demo
When we run the application we receive the following output.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
sending message='message-1' to topic='batch.t'
sending message='message-2' to topic='batch.t'
sending message='message-3' to topic='batch.t'
sending message='message-4' to topic='batch.t'
sending message='message-5' to topic='batch.t'
sending message='message-6' to topic='batch.t'
sending message='message-7' to topic='batch.t'
sending message='message-8' to topic='batch.t'
sending message='message-9' to topic='batch.t'
sending message='message-10' to topic='batch.t'
sending message='message-11' to topic='batch.t'
sending message='message-12' to topic='batch.t'
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-1' with partition-offset='0-295'
received message='message-2' with partition-offset='0-296'
received message='message-3' with partition-offset='0-297'
received message='message-4' with partition-offset='0-298'
received message='message-5' with partition-offset='0-299'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-6' with partition-offset='0-300'
received message='message-7' with partition-offset='0-301'
received message='message-8' with partition-offset='0-302'
received message='message-9' with partition-offset='0-303'
received message='message-10' with partition-offset='0-304'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-11' with partition-offset='0-305'
received message='message-12' with partition-offset='0-306'
all batch messages consumed
References
- Apache Kafka Official Website
- Spring Kafka Client Compatability
- Spring Kafka Documentation
- Spring Kafaka JavaDoc
I do have batch size configured for 200. I am not fetching 200 always. Is there a way we can force to fetch fixed number of records (most of the times). Last batch might have different count though.
how do you do manual commit for batch listners?