Spring Kafka – Forwarding Listener Results using @SendTo
This tutorial demonstrates how to forward listener results using the @SendTo
annotation using Spring Kafka, Spring Boot and Maven. We can use static typed topics, runtime expressions or application initialization expressions. Take a look at the following example.
Project Setup
Spring Kafka
: 2.1.4.RELEASESpring Boot
: 2.0.0.RELEASEApache Kafka
: kafka_2.11-1.0.0Maven
: 3.5
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.memorynotfound.spring.kafka</groupId>
<artifactId>send-to</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>http://memorynotfound.com</url>
<description>Spring Kafka - forwarding listener results using @SendTo</description>
<name>Spring Kafka - ${project.artifactId}</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<!-- testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Sending Messages to Kafka Topic
We use the Constants
object as a placeholder for the Kafka topics. This way we can easily use constants to specify which topic we are sending to.
package com.memorynotfound.kafka;
public class Constants {
public static final String FOO_TOPIC = "foo.t";
public static final String BAR_TOPIC = "bar.t";
}
Spring Boot auto-configures the KafkaTemplate
using properties from the application.yml
property file. We use it to send a message to a Kafka topic.
package com.memorynotfound.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import static com.memorynotfound.kafka.Constants.FOO_TOPIC;
@Service
public class Sender {
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, Double> kafkaTemplate;
public void send(Double data){
LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC);
kafkaTemplate.send(FOO_TOPIC, data);
}
}
Forwarding Listener Results using @SendTo
Starting from version 2.0, if you also annotate a @KafkaListener
with a @SendTo
annotation and the method invocation returns a result, the result will be forwarded to the topic specified by the @SendTo
annotation.
The @SendTo
value argument can accept several forms:
@SendTo("someTopic")
specifies a static topic to rout to.@SendTo("#{someExpression}")
specifies an application initialization expression. This expression is evaluated once during application context initialization and’ll be forwarded to the result.@SendTo("!{someExpression}")
specifies a runtime expression. This expression is evaluated at runtime. The#root
object for the evaluation has 3 properties:- request – the inbound
ConsumerRecord
(orConsumerRecords
object for a batch listener) - source – the
Message<?>
converted from therequest
. - result – the method return result.
@SendTo()
(no properties) is treaded as!{source.headers["kafka_replyTopic']}
(since version 2.1.3).
String
representing the topic name.package com.memorynotfound.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import static com.memorynotfound.kafka.Constants.BAR_TOPIC;
import static com.memorynotfound.kafka.Constants.FOO_TOPIC;
@Service
public class Receiver {
private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);
@SendTo(BAR_TOPIC)
@KafkaListener(topics = FOO_TOPIC)
public Double calculate(Double data) {
LOG.info("calculating square root from='{}'", data);
return Math.sqrt(data);
}
@KafkaListener(topics = BAR_TOPIC)
public void result(Double data) {
LOG.info("received square root='{}'", data);
}
}
Configure Application with application.yml
Spring Boot tries to automatically configure your application with sensible defaults based on the specified dependencies inside your pom.xml
file. We haven’t configured any Consumer
, Producer
or KafkaTemplate
beans, spring boot will auto-configure them using spring boot default values. These values can be overridden using the application.yml
property file. You can find more information about Spring Boot Kafka Properties.
We also create a application.yml
properties file which is located in the src/main/resources
folder. These properties are injected in the configuration classes by spring boot.
spring:
kafka:
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer
value-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.DoubleSerializer
value-serializer: org.apache.kafka.common.serialization.DoubleSerializer
logging:
level:
root: WARN
org.springframework.web: INFO
com.memorynotfound: DEBUG
Running the Application
Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost
on port 9092
, which is the default configuration of Kafka.
package com.memorynotfound.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Autowired
private Sender sender;
@Override
public void run(String... strings) throws Exception {
sender.send(123.123);
}
}
Demo
When we run the application we receive the following output.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.0.RELEASE)
Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
Started SpringKafkaApplication in 2.334 seconds (JVM running for 3.186)
Started SpringKafkaApplication in 3.272 seconds (JVM running for 4.534)
sending data='123.123' to topic='foo.t'
calculating square root from='123.123'
received square root='11.096080389038285'
References
- Apache Kafka Official Website
- Spring Kafka Client Compatability
- Spring Kafka Documentation
- Spring Kafaka JavaDoc
- Spring Boot Common Application Properties
- Spring Kafka @SendTo Documentation
no @SendTo(“!{… example
Useless.