In the past post, we had introduced about RabbitMQ Publish/Subcribe pattern with fanout exchange. Today, JavaSampleApproach will show you how to work with SpringBoot RabbitMQ Topic Exchange.
Related posts:
– RabbitMQ – How to create Spring RabbitMQ Producer/Consumer applications with SpringBoot
– RabbitMQ – How to send/receive Java object messages with Spring RabbitMq | SpringBoot
– RabbitMq – How to create Spring RabbitMq Publish/Subcribe pattern with SpringBoot
– SpringBoot RabbitMq Headers Exchange
– SpringBoot RabbitMq Exchange to Exchange Topology
Contents
I. Technologies
– Java 8
– Maven 3.6.1
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.7.RELEASE
– RabbitMQ
II. RabbitMq Topic Exchange
routing_key
of messages sent to a topic exchange must be a list of words, delimited by dots, example:
– #.error
– *.prod.*
– sys.#
Note:
* (star) must be an exactly one word.
# (hash) can be zero or more words.
With the above topic exchange design,
– when we send a message with routing key: sys.dev.info
, it will just be delivered to Q1.
– when we send a message with routing key: app.prod.error
, it will just be delivered to Q2.
– when we send a message with routing key: sys.test.error
, it will be delivered to both queues {Q1, Q2}.
– When a queue is bound with “#” (hash) binding key – it is as an fanout exchange.
– When don’t use * & # in bindings, it will behave as a direct exchange.
III. Practices
In the tutorial, we create 2 SpringBoot project as below:
Step to do:
– Create SpringBoot projects
– Define data model
– Implement RabbitMq Producer
– Implement RabbitMq consumer
– Run and check results
1. Create SpringBoot projects
Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp
:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
2. Define data model
Create Log data model for both projects:
package com.javasampleapproach.rabbitmq.model; public class Log { private String content; private String routingKey; public Log(){}; public Log(String content, String routingKey){ this.content = content; this.routingKey = routingKey; } public String getContent(){ return this.content; } public void setContent(String content){ this.content = content; } public String getRoutingKey(){ return this.routingKey; } public void setRoutingKey(String routingKey){ this.routingKey = routingKey; } @Override public String toString() { return String.format("{content = %s, routingKey = %s}", content, routingKey); } } |
3. Implement RabbitMq producer
3.1 Configure RabbitMq producer
package com.javasampleapproach.rabbitmq.config; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } } |
Open application.properties, add configuration:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest jsa.rabbitmq.exchange=jsa.exchange.logs jsa.rabbitmq.queue=jsa.queue jsa.rabbitmq.routingkey=jsa.routingkey |
3.2 Implement RabbitMq producer
package com.javasampleapproach.rabbitmq.producer; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.javasampleapproach.rabbitmq.model.Log; @Component public class Producer { @Autowired private AmqpTemplate amqpTemplate; @Value("${jsa.rabbitmq.exchange}") private String exchange; public void produce(Log logs){ String routingKey = logs.getRoutingKey(); amqpTemplate.convertAndSend(exchange, routingKey, logs); System.out.println("Send msg = " + logs); } } |
3.3 Implement RabbitMqProducer client
package com.javasampleapproach.rabbitmq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.javasampleapproach.rabbitmq.model.Log; import com.javasampleapproach.rabbitmq.producer.Producer; @SpringBootApplication public class SpringRabbitMqProducerApplication implements CommandLineRunner{ public static void main(String[] args) { SpringApplication.run(SpringRabbitMqProducerApplication.class, args); } @Autowired Producer producer; @Override public void run(String... args) throws Exception { /** * 1 */ String content = "2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52"; String routingKey = "sys.dev.info"; // send to RabbitMQ producer.produce(new Log(content, routingKey)); /** * 2 */ content = "2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]"; routingKey = "sys.test.error"; // send to RabbitMQ producer.produce(new Log(content, routingKey)); /** * 3 */ content = "2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception"; routingKey = "app.prod.error"; // send to RabbitMQ producer.produce(new Log(content, routingKey)); } } |
4. Implement RabbitMq consumer
4.1 Configure RabbitMq consumer
package com.javasampleapproach.rabbitmq.config; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); factory.setMessageConverter(jsonMessageConverter()); return factory; } } |
Open application.properties, add configuration:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest jsa.rabbitmq.queue=jsa.logs.sys #jsa.rabbitmq.queue=jsa.logs.prod.error |
4.2 Implement consumer
package com.javasampleapproach.rabbitmq.consumer; import org.apache.commons.logging.Log; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues="${jsa.rabbitmq.queue}", containerFactory="jsaFactory") public void recievedMessage(Log logs) { System.out.println("Recieved Message: " + logs); } } |
5. Run and check results
5.1 Setup RabbitMq exchange, queues
Enable rabbitmq_management by cmd: rabbitmq-plugins enable rabbitmq_management --online
Then go to: http://localhost:15672
-> login with user/password: guest
/guest
– Add exchange:
Go to http://localhost:15672/#/exchanges, add exchange: jsa.exchange.logs
– Add queue:
Go to http://localhost:15672/#/queues
, add 2 queues: jsa.logs.sys
, jsa.logs.prod.error
.
– Binding the queues with above exchange:
5.2 Run & check results
– Run SpringBootRabbitMqProducer with commandline mvn spring-boot:run
,
-> Console’s logs:
Send msg = {content = 2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info} Send msg = {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error} Send msg = {content = 2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error} |
-> See queues’ status:
– Run SpringBootRabbitMqConsumer which listen to jsa.logs.sys
queue with configuration: jsa.rabbitmq.queue=jsa.logs.sys
:
-> Console’s logs:
Recieved Message: {content = 2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info} Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error} |
-> See queues’s status:
– Run SpringBootRabbitMqConsumer which listen to jsa.logs.prod.error
queue with configuration: jsa.rabbitmq.queue=jsa.logs.prod.error
:
-> Console’s logs:
Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error} Recieved Message: {content = 2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error} |
-> See queues’s status:
IV. Sourcecode
SpringRabbitMqProducer
SpringRabbitMqConsumer
Whe n i try to run the code, i am gtting the following error :
Bean [com.javasampleapproach.rabbitmq.consumer.Consumer@3b9e56db]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:129) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:848) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
You can see a guide as below video:
https://youtu.be/P6UUAEzjrFs
import com.javasampleapproach.rabbitmq.model.Log;
instead of
import org.apache.commons.logging.Log;
in class Consumer{}
Hi, I want to ask why the queue name of producer is “jsa.rabbitmq.queue=jsa.queue”, but queue name of consumer is “jsa.rabbitmq.queue=jsa.logs.sys”? I’m thinking that they (producer & consumer) have to use a same queue to communicate, is that correct, or they communicate by 2 separate queues?
Thank so much,
Lucas
I have a use case where the client exposed their topic exchange and routingKeys, I just need to listen the messages and have no rights to create queues. In this example, we are binding queues to routingKeys, but in my case, I have only topic exchange and routingKeys. Please suggest how I can make the connection and get the data?