Spring Kafka and Spring Boot Configuration Example
In the following tutorial we demonstrate how to configure Spring Kafka with Spring Boot. Spring Boot uses sensible default to configure Spring Kafka. We can override these defaults using the application.yml
property file.
Project Setup
Spring Kafka
: 2.1.4.RELEASESpring Boot
: 2.0.0.RELEASEApache Kafka
: kafka_2.11-1.0.0Maven
: 3.5
Previously we saw how to create a spring kafka consumer and producer which manually configures the Producer
and Consumer
. In this example we’ll use Spring Boot to automatically configure them for us using sensible defaults.
Download and Install Apache Kafka
To download and install Apache Kafka, please read the official documentation here. This tutorial assumes that server is started using the default configuration and no server ports are changed.
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.memorynotfound.spring.kafka</groupId>
<artifactId>spring-boot</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>http://memorynotfound.com</url>
<description>Spring Kafka Spring Boot Configuration Example</description>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Sending Spring Kafka Messages with Spring Boot
Spring Boot automatically configures and initializes a KafkaTemplate
based on the properties configured in the application.yml
property file. By using the @Service
annotation we make the Sender
class eligible for the spring container to do auto discovery.
package com.memorynotfound.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topic;
public void send(String message){
LOG.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message);
}
}
Receiving Kafka Messages with Spring Boot
The ConcurrentKafkaListenerContainerFactory
and KafkaMessageListenerContainer
beans are also automatically configured by Spring Boot. You can optionally configure these beans using the application.yml
property file.
By annotating a method with @KafkaListener
annotation Spring Kafka will automatically create a message listener container.
package com.memorynotfound.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class Receiver {
private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
@KafkaListener(topics = "${app.topic.foo}")
public void receive(@Payload String message,
@Headers MessageHeaders headers) {
LOG.info("received message='{}'", message);
headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key)));
}
}
Configure Application with application.yml
Spring Boot tries to automatically configure your application with sensible defaults based on the specified dependencies inside your pom.xml
file. We haven’t configured any Consumer
, Producer
or KafkaTemplate
beans, spring boot will auto-configure them using spring boot default values. These values can be overridden using the application.yml
property file. You can find more information about Spring Boot Kafka Properties.
We also create a application.yml
properties file which is located in the src/main/resources
folder. These properties are injected in the configuration classes by spring boot.
spring:
kafka:
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
app:
topic:
foo: foo.t
logging:
level:
root: WARN
org.springframework.web: INFO
com.memorynotfound: DEBUG
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost
on port 9092
, which is the default configuration of Kafka.
package com.memorynotfound.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
sender.send("Spring Kafka and Spring Boot Configuration Example");
}
}
Demo
When we run the application we receive the following output.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
Started SpringKafkaApplication in 2.334 seconds (JVM running for 3.186)
sending message='Spring Kafka and Spring Boot Configuration Example' to topic='foo.t'
received message='Spring Kafka and Spring Boot Configuration Example'
kafka_offset: 144
kafka_nativeHeaders: RecordHeaders(headers = [], isReadOnly = false)
kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@6d75d48b
kafka_timestampType: CREATE_TIME
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: foo.t
kafka_receivedTimestamp: 1520508611795
References
- Apache Kafka Official Website
- Spring Kafka Client Compatability
- Spring Kafka Documentation
- Spring Kafaka JavaDoc
- Spring Boot Common Application Properties