SpringBoot RabbitMQ Topic Exchange

In the past post, we had introduced about RabbitMQ Publish/Subcribe pattern with fanout exchange. Today, JavaSampleApproach will show you how to work with SpringBoot RabbitMQ Topic Exchange.

Related posts:
RabbitMQ – How to create Spring RabbitMQ Producer/Consumer applications with SpringBoot
RabbitMQ – How to send/receive Java object messages with Spring RabbitMq | SpringBoot
RabbitMq – How to create Spring RabbitMq Publish/Subcribe pattern with SpringBoot
SpringBoot RabbitMq Headers Exchange
SpringBoot RabbitMq Exchange to Exchange Topology

I. Technologies

– Java 8
– Maven 3.6.1
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.7.RELEASE
– RabbitMQ

II. RabbitMq Topic Exchange

routing_key of messages sent to a topic exchange must be a list of words, delimited by dots, example:

#.error
*.prod.*
sys.#

Note:
* (star) must be an exactly one word.
# (hash) can be zero or more words.

springboot rabbitmq topic - architecture

With the above topic exchange design,
– when we send a message with routing key: sys.dev.info, it will just be delivered to Q1.
– when we send a message with routing key: app.prod.error, it will just be delivered to Q2.
– when we send a message with routing key: sys.test.error, it will be delivered to both queues {Q1, Q2}.

Topic exchange is strong tool and it can act as other exchanges as below:

– When a queue is bound with “#” (hash) binding key – it is as an fanout exchange.
– When don’t use * & # in bindings, it will behave as a direct exchange.

III. Practices

In the tutorial, we create 2 SpringBoot project as below:

springboot rabbitmq topic -project structure

Step to do:
– Create SpringBoot projects
– Define data model
– Implement RabbitMq Producer
– Implement RabbitMq consumer
– Run and check results

1. Create SpringBoot projects

Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Define data model

Create Log data model for both projects:


package com.javasampleapproach.rabbitmq.model;

public class Log {
	private String content;
	private String routingKey;
	
	public Log(){};
	
	public Log(String content, String routingKey){
		this.content = content;
		this.routingKey = routingKey;
	}
	
	public String getContent(){
		return this.content;
	}
	
	public void setContent(String content){
		this.content = content;
	}
	
	public String getRoutingKey(){
		return this.routingKey;
	}
	
	public void setRoutingKey(String routingKey){
		this.routingKey = routingKey;
	}
	
	@Override
	public String toString() {
		return String.format("{content = %s, routingKey = %s}", content, routingKey);
	}
}

3. Implement RabbitMq producer

3.1 Configure RabbitMq producer


package com.javasampleapproach.rabbitmq.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
	
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.exchange=jsa.exchange.logs
jsa.rabbitmq.queue=jsa.queue
jsa.rabbitmq.routingkey=jsa.routingkey

3.2 Implement RabbitMq producer


package com.javasampleapproach.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.javasampleapproach.rabbitmq.model.Log;

@Component
public class Producer {
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	@Value("${jsa.rabbitmq.exchange}")
	private String exchange;
	
	public void produce(Log logs){
		String routingKey = logs.getRoutingKey();
		amqpTemplate.convertAndSend(exchange, routingKey, logs);
		System.out.println("Send msg = " + logs);
	}
}

3.3 Implement RabbitMqProducer client


package com.javasampleapproach.rabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.javasampleapproach.rabbitmq.model.Log;
import com.javasampleapproach.rabbitmq.producer.Producer;

@SpringBootApplication
public class SpringRabbitMqProducerApplication  implements CommandLineRunner{

	public static void main(String[] args) {
		SpringApplication.run(SpringRabbitMqProducerApplication.class, args);
	}
	
	@Autowired
	Producer producer;

	@Override
	public void run(String... args) throws Exception {
		
		/**
		 *  1
		 */
		String content = "2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52";
		String routingKey = "sys.dev.info";
		
		// send to RabbitMQ
		producer.produce(new Log(content, routingKey));
		
		/**
		 *  2
		 */
		content = "2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]";
		routingKey = "sys.test.error";
		
		// send to RabbitMQ
		producer.produce(new Log(content, routingKey));
		
		/**
		 *  3
		 */
		content = "2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception";
		routingKey = "app.prod.error";
		
		// send to RabbitMQ
		producer.produce(new Log(content, routingKey));
	}
}

4. Implement RabbitMq consumer

4.1 Configure RabbitMq consumer


package com.javasampleapproach.rabbitmq.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
	
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.logs.sys
#jsa.rabbitmq.queue=jsa.logs.prod.error

4.2 Implement consumer


package com.javasampleapproach.rabbitmq.consumer;

import org.apache.commons.logging.Log;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {
	
	@RabbitListener(queues="${jsa.rabbitmq.queue}", containerFactory="jsaFactory")
    public void recievedMessage(Log logs) {
        System.out.println("Recieved Message: " + logs);
    }
}

5. Run and check results

5.1 Setup RabbitMq exchange, queues

Enable rabbitmq_management by cmd: rabbitmq-plugins enable rabbitmq_management --online
Then go to: http://localhost:15672 -> login with user/password: guest/guest

springboot rabbitmq topic - connection

Add exchange:
Go to http://localhost:15672/#/exchanges, add exchange: jsa.exchange.logs

springboot rabbitmq topic - create exchange

Add queue:
Go to http://localhost:15672/#/queues, add 2 queues: jsa.logs.sys, jsa.logs.prod.error.

springboot rabbitmq topic - 2 queue

– Binding the queues with above exchange:

springboot rabbitmq topic - exchange bindling

5.2 Run & check results

– Run SpringBootRabbitMqProducer with commandline mvn spring-boot:run,

-> Console’s logs:


Send msg = {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Send msg = {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Send msg = {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}

-> See queues’ status:

springboot rabbitmq topic - message queue after sending

– Run SpringBootRabbitMqConsumer which listen to jsa.logs.sys queue with configuration: jsa.rabbitmq.queue=jsa.logs.sys:

-> Console’s logs:


Recieved Message: {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}

-> See queues’s status:

springboot rabbitmq topic - aftern consume in jsa.logs.sys

– Run SpringBootRabbitMqConsumer which listen to jsa.logs.prod.error queue with configuration: jsa.rabbitmq.queue=jsa.logs.prod.error:

-> Console’s logs:


Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Recieved Message: {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}

-> See queues’s status:

IV. Sourcecode

SpringRabbitMqProducer
SpringRabbitMqConsumer



By grokonez | October 10, 2017.

Last updated on May 5, 2021.



Related Posts


5 thoughts on “SpringBoot RabbitMQ Topic Exchange”

  1. Whe n i try to run the code, i am gtting the following error :
    Bean [com.javasampleapproach.rabbitmq.consumer.Consumer@3b9e56db]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:848) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

    1. import com.javasampleapproach.rabbitmq.model.Log;
      instead of
      import org.apache.commons.logging.Log;
      in class Consumer{}

  2. Hi, I want to ask why the queue name of producer is “jsa.rabbitmq.queue=jsa.queue”, but queue name of consumer is “jsa.rabbitmq.queue=jsa.logs.sys”? I’m thinking that they (producer & consumer) have to use a same queue to communicate, is that correct, or they communicate by 2 separate queues?
    Thank so much,
    Lucas

  3. I have a use case where the client exposed their topic exchange and routingKeys, I just need to listen the messages and have no rights to create queues. In this example, we are binding queues to routingKeys, but in my case, I have only topic exchange and routingKeys. Please suggest how I can make the connection and get the data?

Got Something To Say:

Your email address will not be published. Required fields are marked *

*