Spring Kafka – Consumer and Producer Example
This tutorial demonstrates how to send and receive messages from Spring Kafka. We start by creating a Spring Kafka Producer which is able to send messages to a Kafka topic. Next we create a Spring Kafka Consumer which is able to listen to messages send to a Kafka topic. We configure both with appropriate key/value serializers and deserializers. Finally we demonstrate the application using a simple Spring Boot application.
Download and Install Apache Kafka
To download and install Apache Kafka, please read the official documentation here. This tutorial assumes that server is started using the default configuration and no server ports are changed.
Project Setup
Spring Kafka
: 2.1.4.RELEASESpring Boot
: 2.0.0.RELEASEApache Kafka
: kafka_2.11-1.0.0Maven
: 3.5
Project Structure
Let’s start by looking at the project structure.
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.memorynotfound.spring.kafka</groupId>
<artifactId>producer-consumer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>http://memorynotfound.com/spring-kafka-consumer-producer-example</url>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Spring Kafka Sending Messages to Topic
Let’s start by sending messages. We use the KafkaTemplate
class which wraps a Producer
and provides high-level operations to send data to Kafka topics. Both asynchronous and synchronous methods are provided, with the async methods returning a Future
.
package com.memorynotfound.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topic;
public void send(String message){
LOG.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message);
}
}
In order to successfully send messages to a Kafka topic, we need to configure The KafkaTemplate
. This configuration is handled by the SenderConfig
class.
We configure the KafkaTemplate
using an implementation of the ProducerFactory
more specifically the DefaultKafkaProducerFactory
. We can initialize this producer factory using a Map<String, Object>
. We can use the keys taken from the ProducerConfig
class.
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
specifies a list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping/this list only impacts the initial hosts used to discover the full set of servers. This list should be in the formhost1:port1,host2:port2,...
. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
specifies the serializer class for key that implements theorg.apache.kafka.common.serialization.Serializer
interface.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
specifies the serializer class for value that implements theorg.apache.kafka.common.serialization.Serializer
interface.
For a complete list of configuration options take a look at the ProducerConfig class.
package com.memorynotfound.kafka.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Spring Kafka Listening Messages from Topic
Next, we’ll show how to listen to messages from a Kafka topic. The Receiver
class will consume messages form a Kafka topic. We created the Listen()
method and annotated it with the @KafkaListener
annotation which marks the method to be the target of a Kafka message listener on the specified topics.
package com.memorynotfound.kafka.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class Receiver {
private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
@KafkaListener(topics = "${app.topic.foo}")
public void listen(@Payload String message) {
LOG.info("received message='{}'", message);
}
}
This mechanism requires an @EnableKafka
annotation on one of the @Configuration
classes and listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer
.
It is important that we use the same type of key/value deserializers which we used in the SenderConfig
class.
ConsumerConfig.GROUP_ID_CONFIG
specifies a unique string that identifies the consumer group this consumer belongs to.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
specifies what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):earliest
: automatically reset the offset to the earliest offsetlatest
: automatically reset the offset to the latest offsetnone
: throw exception to the consumer if no previous offset is found for the consumer’s groupanything else
: throw exception to the consumer.
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances. If all the consumer instances have different consumer groups, then each record will be broadcasted to all the consumer processes.
For a complete list of configuration options take a look at the ConsumerConfig class.
package com.memorynotfound.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Configure Application with application.yml
We also create a application.yml
properties file which is located in the src/main/resources
folder. These properties are injected in the configuration classes by spring boot.
spring:
kafka:
bootstrap-servers: localhost:9092
app:
topic:
foo: foo.t
logging:
level:
root: ERROR
org.springframework.web: ERROR
com.memorynotfound: DEBUG
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost
on port 9092
, which is the default configuration of Kafka.
package com.memorynotfound.kafka;
import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerConsumerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(ProducerConsumerApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
sender.send("Spring Kafka Producer and Consumer Example");
}
}
Demo
When we run the application we receive the following output.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
sending message='Spring Kafka Producer and Consumer Example' to topic='foo.t'
received message='Spring Kafka Producer and Consumer Example'
References
- Apache Kafka Official Website
- Spring Kafka Client Compatability
- Spring Kafka Documentation
- Spring Kafaka JavaDoc
If client is abnormally disconnected, will client get message send during keepalive period?
It’s possible. But you’ll have to program this inside your consumer to read from the beginning. Kafka employs a dumb broker and uses smart consumers to read its buffer. Kafka is a durable message store and clients can get a “replay” of the event stream on demand, as opposed to more traditional message brokers where once a message has been delivered, it is removed from the queue.
Thanks a lot for this! Works like a charm
I shifted through maybe 6 tutorials online and yours was the finally one that worked, thank you so much and God bless your souls
Love the Simplicity of this code….
so much appreciated