Spring Kafka – JSON Serializer and Deserializer Example
The following tutorial demonstrates how to send and receive a Java Object as a JSON byte[]
to and from Apache Kafka using Spring Kafka, Spring Boot and Maven. We’ll send a Java Object as JSON byte[]
to a Kafka Topic using a JsonSerializer
. Afterwards we’ll configure how to receive a JSON byte[]
and automatically convert it to a Java Object using a JsonDeserializer
.
Project Setup
Spring Kafka
: 2.1.4.RELEASESpring Boot
: 2.0.0.RELEASEApache Kafka
: kafka_2.11-1.0.0Maven
: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path. Since we are working with JSON, we need to include the Jackson JSON library com.fasterxml.jackson.core:ackson-databind
.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.memorynotfound.spring.kafka</groupId>
<artifactId>message-conversion-json</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>http://memorynotfound.com/spring-kafka-json-serializer-deserializer-example</url>
<description>Spring Kafka - JSON Serializer Deserializer Example</description>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<!-- json support -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Simple POJO to Serialize/Deserialize
In this example we’ll send and receive a Foo
object to and from a Kafka topic.
package com.memorynotfound.kafka;
public class Foo {
private String name;
private String description;
public Foo() {
}
public Foo(String name, String description) {
this.name = name;
this.description = description;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
@Override
public String toString() {
return "Foo{" +
"name='" + name + '\'' +
", description='" + description + '\'' +
'}';
}
}
Apache Kafka stores and transports bye[]
. There are a number of built in serializers and deserializers but it doesn’t include any for JSON
. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON.
Producing JSON messages with Spring Kafka
Let’s start by sending a Foo
object to a Kafka Topic. Notice: we created a KafkaTemplate<String, Foo>
since we are sending Java Objects to the Kafka topic that’ll automatically be transformed in a JSON byte[]
. In this example we created a Message<Foo>
using the MessageBuilder
. It’s important to add the topic where we are going to send the message to.
package com.memorynotfound.kafka.producer;
import com.memorynotfound.kafka.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class FooSender {
private static final Logger LOG = LoggerFactory.getLogger(FooSender.class);
@Autowired
private KafkaTemplate<String, Foo> kafkaTemplate;
@Value("${app.topic.example}")
private String topic;
public void send(Foo data){
LOG.info("sending data='{}' to topic='{}'", data, topic);
Message<Foo> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, topic)
.build();
kafkaTemplate.send(message);
}
}
Starting with version 2.1, type information can be conveyed in record Headers
,
allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka Properties.
JsonSerializer.ADD_TYPE_INFO_HEADERS
(defaulttrue
); set tofalse
to disable this feature.JsonSerializer.DEFAULT_KEY_TYPE
; fallback type for deserialization of keys if no header information is present.JsonSerializer.DEFAULT_VALUE_TYPE
; fallback type for deserialization of values if no header information is present.JsonSerializer.TRUSTED_PACKAGES
(defaultjava.util
,java.lang
); comma-delimited list of packages patterns allowed for deserialization;*
means deserialize all.
We need to configure the correct Serializer
to support JSON types. We can register this by setting the ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
to the JsonSerializer
class. Finally, we need to set the correct value type for our ProducerFactory
and KafkaTemplate
to the Foo
object.
package com.memorynotfound.kafka.producer;
import com.memorynotfound.kafka.Foo;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class FooSenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Foo> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Foo> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consuming JSON Messages with Spring Kafka
Next, we’ll look at how we can receive JSON messages. In the FooListener
we simply need to add the Foo
Java Object as a parameter in our method.
package com.memorynotfound.kafka.consumer;
import com.memorynotfound.kafka.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class FooListener {
private static final Logger LOG = LoggerFactory.getLogger(FooListener.class);
@KafkaListener(topics = "${app.topic.example}")
public void receive(@Payload Foo data,
@Headers MessageHeaders headers) {
LOG.info("received data='{}'", data);
headers.keySet().forEach(key -> {
LOG.info("{}: {}", key, headers.get(key));
});
}
}
The FooListenerConfig
is a bit more complex. First we need to add the appropriate Deserializer
which can convert JSON byte[]
into a Java Object. To do this, we need to set the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
with the JsonDeserializer
class. Next we need to create a ConsumerFactory
and pass the consumer configuration, the key deserializer and the typed JsonDeserializer<>(Foo.class)
. Finally, we need to make sure the ConsumerFactory
and the ConcurrentKafkaListenerContainerFactory
all have the correct value type of Foo
.
package com.memorynotfound.kafka.consumer;
import com.memorynotfound.kafka.Foo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class FooListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, Foo> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(Foo.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Foo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Foo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Configure with application.yml
We also create a application.yml
properties file which is located in the src/main/resources
folder. These properties are injected in the configuration classes by spring boot.
spring:
kafka:
bootstrap-servers: localhost:9092
app:
topic:
example: example.t
logging:
level:
root: WARN
org.springframework.web: INFO
com.memorynotfound: DEBUG
Running with Spring Boot
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost
on port 9092
, which is the default configuration of Kafka.
package com.memorynotfound.kafka;
import com.memorynotfound.kafka.producer.FooSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private FooSender sender;
@Override
public void run(String... strings) throws Exception {
Foo foo = new Foo("Spring Kafka", "sending and receiving JSON messages");
sender.send(foo);
}
}
Output
When we run the application we receive the following output.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
No active profile set, falling back to default profiles: default
sending data='Foo{name='Spring Kafka', description='sending and receiving JSON messages'}' to topic='example.t'
received data='Foo{name='Spring Kafka', description='sending and receiving JSON messages'}'
kafka_offset: 18
kafka_consumer: org.apache.kafka.clients.consumer.KafkaConsumer@333d0b64
kafka_timestampType: CREATE_TIME
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: example.t
kafka_receivedTimestamp: 1520332684097
__TypeId__: [99, 111, 109, 46, 109, 101, 109, 111, 114, 121, 110, 111, 116, 102, 111, 117, 110, 100, 46, 107, 97, 102, 107, 97, 46, 70, 111, 111]
References
- Apache Kafka Official Website
- Spring Kafka Client Compatability
- Spring Kafka Documentation
- Spring Kafaka JavaDoc
- Spring Kafka Serialize Deserialize Documentation