Produce Consume RabbitMQ Spring JSON Message Queue

In this tutorial we explain how to configure RabbitMQ with Spring to Produce and Consume JSON messages over a queue. This tutorial uses spring java configuration instead of the xml configuration. After this tutorial you will be able to produce messages on a message Queue, listen for those messages and successfully configure a message queue.

  1. Download the Rabbit MQ binaries.
  2. Add maven dependencies.
  3. Configure Rabbit MQ using Spring.
  4. Create a producer for sending messages.
  5. Create a listener to listen for those messages.

Dependencies

Add maven dependencies for spring, spring-rabbit and jackson.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.rabbitmq</groupId>
    <artifactId>rabbitmq-spring</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <name>RabbitMQ - ${project.artifactId}</name>

    <dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.1.6.RELEASE</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
    </dependencies>

</project>
Note: we assume you’ve already downloaded rabbit MQ. If not, you can get the binaries from here.

Start RabbitMQ

first start the rabbit mq server.

./rabbitmq_server-3.5.1/sbin/rabbitmq-server

Message

We are going to send this custom message to the queue. This simple POJO will be automatically serialized into a JSON object when send to the queue.

package com.memorynotfound.rabbitmq;

public class CustomMessage {

    private int id;
    private String name;

    public CustomMessage(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "CustomMessage{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

RabbitMQ Spring JSON Java Configuration

  • Create and configure a connection factory.
  • Create a Queue with key: ‘simple.queue.name’.
  • Register a jsonMessageConverter.
  • Create and configure the RabbitTemplate.
  • Create and configure a SimpleMessageListenerContainer.
package com.memorynotfound.rabbitmq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@ComponentScan("com.memorynotfound.rabbitmq")
public class RabbitMqConfig {

    private static final String SIMPLE_MESSAGE_QUEUE = "simple.queue.name";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public Queue simpleQueue() {
        return new Queue(SIMPLE_MESSAGE_QUEUE);
    }

    @Bean
    public MessageConverter jsonMessageConverter(){
        return new JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(SIMPLE_MESSAGE_QUEUE);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleMessageListenerContainer listenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueues(simpleQueue());
        listenerContainer.setMessageConverter(jsonMessageConverter());
        listenerContainer.setMessageListener(new Consumer());
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return listenerContainer;
    }

}

RabbitMQ Spring Message Listener

In the Spring Java Configuration we registered a Consumer class which acts as a message listener to intercept the produced messages. We simple print the result to the console.

package com.memorynotfound.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class Consumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));
    }
}

RabbitMQ Spring JSON Producer

Let’s test our configuration and produce a couple of messages.

package com.memorynotfound.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer {

    public static void main(String[] args) {
        ApplicationContext ctx = new AnnotationConfigApplicationContext(RabbitMqConfig.class);
        RabbitTemplate rabbitTemplate = ctx.getBean(RabbitTemplate.class);

        AtomicInteger counter = new AtomicInteger();
        for (int i = 0; i < 5; i++){
            System.out.println("sending new custom message..");
            rabbitTemplate.convertAndSend(new CustomMessage(counter.incrementAndGet(), "RabbitMQ Spring JSON Example"));
        }
    }

}

Output

INFO: Created new connection: [email protected] [delegate=amqp:[email protected]:5672/]
sending new custom message..
sending new custom message..
sending new custom message..
sending new custom message..
sending new custom message..
{"id":1,"name":"RabbitMQ Spring JSON Example"}
{"id":2,"name":"RabbitMQ Spring JSON Example"}
{"id":3,"name":"RabbitMQ Spring JSON Example"}
{"id":3,"name":"RabbitMQ Spring JSON Example"}
{"id":3,"name":"RabbitMQ Spring JSON Example"}

References

Download

You may also like...