Spring Kafka – Adding Custom Header to Kafka Message Example
In this tutorial we demonstrate how to add/read custom headers to/from a Kafka Message using Spring Kafka. We start by adding headers using either Message<?>
or ProducerRecord<String, String>
. Followed by reading the values inside the KafkaListener
using @Header
annotation and MessageHeaders
class.
Project Setup
Spring Kafka
: 2.1.4.RELEASESpring Boot
: 2.0.0.RELEASEApache Kafka
: kafka_2.11-1.0.0Maven
: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path. Important: We need to include the com.fasterxml.jackson.core:jackson-databind
dependency for working with rich header values.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.memorynotfound.spring.kafka</groupId>
<artifactId>message-headers</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>http://memorynotfound.com</url>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
<spring-boot.version>2.0.0.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<!-- for serializing/deserializing complex headers -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The 0.11.0.0 Apache Kafka client introduced support for headers in messages. Spring for Apache Kafka supports mapping these headers to/from MessageHeaders
since version 2.0.
Sending Custom Headers with Spring Kafka
Let’s start by adding custom header values to a Kafka Message. We can add headers to a Kafka message using either Message<?>
or ProducerRecord<String, String>
like shown in the following code.
package com.memorynotfound.kafka.producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic.foo}")
private String topicFoo;
@Value("${app.topic.bar}")
private String topicBar;
public void sendFoo(String data){
Message<String> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, topicFoo)
.setHeader(KafkaHeaders.MESSAGE_KEY, "999")
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka")
.build();
LOG.info("sending message='{}' to topic='{}'", data, topicFoo);
kafkaTemplate.send(message);
}
public void sendBar(String data){
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka".getBytes()));
ProducerRecord<String, String> bar = new ProducerRecord<>(topicBar, 0, "111", data, headers);
LOG.info("sending message='{}' to topic='{}'", data, topicBar);
kafkaTemplate.send(bar);
}
}
We configure the KafkaTemplate
inside the SenderConfig
class. For simplicity we used a StringSerializer
for both key and value fields.
package com.memorynotfound.kafka.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class SenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Reading Custom Header Values with Spring Kafka
Previously we saw how to send custom header values. Now we are going to read those values.
We have a couple of options. We can inject each header individually using the @Header
annotation. Or we can inject the MessageHeaders
which you can use to iterate over each header. You can use whichever you find more suitable.
package com.memorynotfound.kafka.consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class Listener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(topics = "${app.topic.foo}")
public void receive(@Payload String data,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer,
@Header(KafkaHeaders.TIMESTAMP_TYPE) String timestampType,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp,
@Header("X-Custom-Header") String customHeader) {
LOG.info("- - - - - - - - - - - - - - -");
LOG.info("received message='{}'", data);
LOG.info("consumer: {}", consumer);
LOG.info("topic: {}", topic);
LOG.info("message key: {}", messageKey);
LOG.info("partition id: {}", partitionId);
LOG.info("offset: {}", offset);
LOG.info("timestamp type: {}", timestampType);
LOG.info("timestamp: {}", timestamp);
LOG.info("custom header: {}", customHeader);
}
@KafkaListener(topics = "${app.topic.bar}")
public void receive(@Payload String data,
@Headers MessageHeaders messageHeaders) {
LOG.info("- - - - - - - - - - - - - - -");
LOG.info("received message='{}'", data);
messageHeaders.keySet().forEach(key -> {
Object value = messageHeaders.get(key);
if (key.equals("X-Custom-Header")){
LOG.info("{}: {}", key, new String((byte[])value));
} else {
LOG.info("{}: {}", key, value);
}
});
}
}
The DefaultKafkaHeaderMapper
maps the key to the MessageHeaders
header name. To support rich content i’ll automatically convert objects to JSON. You can optionally initialize the DefaultKafkaHeaderMapper
using your own ObjectMapper
and patterns
.
package com.memorynotfound.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class ListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public DefaultKafkaHeaderMapper headerMapper(){
return new DefaultKafkaHeaderMapper();
}
}
Configure with application.yml
We also create a application.yml
properties file which is located in the src/main/resources
folder. These properties are injected in the configuration classes by spring boot.
spring:
kafka:
bootstrap-servers: localhost:9092
app:
topic:
foo: foo.t
bar: bar.t
logging:
level:
root: WARN
org.springframework.web: INFO
com.memorynotfound: DEBUG
Running with Spring Boot
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost
on port 9092
, which is the default configuration of Kafka.
package com.memorynotfound.kafka;
import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
String data = "Spring Kafka Custom Header Example";
sender.sendFoo(data);
sender.sendBar(data);
}
}
Output
When we run the application we receive the following output.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
No active profile set, falling back to default profiles: default
sending message='Spring Kafka Custom Header Example' to topic='foo.t'
sending message='Spring Kafka Custom Header Example' to topic='bar.t'
- - - - - - - - - - - - - - -
received message='Spring Kafka Custom Header Example'
X-Custom-Header: Sending Custom Header with Spring Kafka
kafka_offset: 49
kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@1c151fa8
kafka_timestampType: CREATE_TIME
kafka_receivedMessageKey: 111
kafka_receivedPartitionId: 0
kafka_receivedTopic: bar.t
kafka_receivedTimestamp: 1520322866725
- - - - - - - - - - - - - - -
received message='Spring Kafka Custom Header Example'
consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7e4dcc64
topic: foo.t
message key: 999
partition id: 0
offset: 137
timestamp type: CREATE_TIME
timestamp: 1520322866701
custom header: Sending Custom Header with Spring Kafka
References
- Apache Kafka Official Website
- Spring Kafka Client Compatability
- Spring Kafka Documentation
- Spring Kafaka JavaDoc
- Spring Kafka Message Headers Documentation