In the past post, we had introduced about RabbitMQ Topic Exchange. Today, JavaSampleApproach will show you how to work with SpringBoot RabbitMQ Headers Exchange.
Related posts:
– SpringBoot RabbitMQ Topic Exchange
– RabbitMQ – How to create Spring RabbitMQ Producer/Consumer applications with SpringBoot
– RabbitMq – How to create Spring RabbitMq Publish/Subcribe pattern with SpringBoot
– RabbitMQ – How to send/receive Java object messages with Spring RabbitMq | SpringBoot
– SpringBoot RabbitMq Exchange to Exchange Topology
Contents
I. Technologies
– Java 8
– Maven 3.6.1
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.7.RELEASE
– RabbitMQ
II. RabbitMq Headers Exchange
Headers exchange routes message based on header values instead of routing keys.
A special argument named x-match
has 2 values {all
, any
} where all
is the default value of a headers binding.
– x-match
= all means that all the values must match.
– x-match
= any means just one matching header value is sufficient.
Scenarios with above design:
– When sending a message with headers: {layer=system
, level=error
}, the message will be delivered to 2 queues {Q1, Q2}.
– When sending a message with headers: {layer=application
, level=error
}, the message will be delivered to one queue Q2.
– When sending a message with headers: {layer=system
, level=info
}, the message will be discarded.
III. Practices
In the tutorial, we create 2 SpringBoot project as below:
Step to do:
– Create SpringBoot projects
– Implement RabbitMq producer
– Implement RabbitMq consumer
– Run and check results
1. Create SpringBoot projects
Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp
:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
2. Implement RabbitMq producer
2.1 Configure RabbitMq connection
Open application.properties file, add configuration:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest jsa.rabbitmq.exchange=jsa.exchange.logs.headers |
2.2 Implement Producer
package com.javasampleapproach.rabbitmq.producer; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class Producer { @Autowired private AmqpTemplate amqpTemplate; @Value("${jsa.rabbitmq.exchange}") private String exchange; public void produce(Message message){ /** * void send(String exchange, String routingKey, Message message) throws AmqpException; * routingKey is NOT need so we set it to an empty String. * */ amqpTemplate.send(exchange, "", message); System.out.println("Send msg = " + new String(message.getBody())); } } |
2.3 Implement Producer client
package com.javasampleapproach.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.javasampleapproach.rabbitmq.producer.Producer; @SpringBootApplication public class SpringRabbitMqProducerApplication implements CommandLineRunner{ public static void main(String[] args) { SpringApplication.run(SpringRabbitMqProducerApplication.class, args); } @Autowired Producer producer; @Override public void run(String... args) throws Exception { /** * message 1 * * This message will be delivered to 2 queues * { * jsa.queue.logs.all-sys-error * jsa.queue.logs.any-app-error * } */ String systemErrorLog = "2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]"; Message sysErrMsg = MessageBuilder.withBody(systemErrorLog.getBytes()) .setHeader("layer", "system") .setHeader("level", "error") .build(); producer.produce(sysErrMsg); /** * message 2 * * This message will be delivered to one queue: jsa.queue.logs.any-app-error */ String appErrorLog = "2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception"; Message appErrMsg = MessageBuilder.withBody(appErrorLog.getBytes()) .setHeader("layer", "application") .setHeader("level", "error") .build(); producer.produce(appErrMsg); /** * message 3 * * => This message will be discarded because it does NOT match with critical conditions of 2 queues * { * jsa.queue.logs.all-sys-error * jsa.queue.logs.any-app-error * } */ String sysInfoLog = "2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52"; Message sysInfoMsg = MessageBuilder.withBody(sysInfoLog.getBytes()) .setHeader("layer", "system") .setHeader("level", "info") .build(); producer.produce(sysInfoMsg); } } |
3. Implement RabbitMq consumer
3.1 Configure RabbitMq connection
Open application.properties file, add configuration:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest jsa.rabbitmq.queue=jsa.queue.logs.all-sys-error #jsa.rabbitmq.queue=jsa.queue.logs.any-app-error |
3.2 Implement Consumer
package com.javasampleapproach.rabbitmq.consumer; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues="${jsa.rabbitmq.queue}") public void recievedMessage(byte[] log) { System.out.println("Recieved Message: " + new String(log)); } } |
4. Run and check results
4.1 Setup RabbitMq exchange, queues
Enable rabbitmq_management by cmd: rabbitmq-plugins enable rabbitmq_management --online
Then go to: http://localhost:15672
-> login with user/password: guest
/guest
.
– Add exchange:
Go to http://localhost:15672/#/exchanges
, add exchange: jsa.exchange.logs.headers
– Add queue:
Go to http://localhost:15672/#/queues
, add 2 queues: jsa.queue.logs.all-sys-error
, jsa.queue.logs.any-app-error
.
– Binding the queues with above exchange:
4.2 Run & check results
– Run SpringBootRabbitMqProducer with commandline mvn spring-boot:run
,
-> Console’s logs:
Send msg = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]] Send msg = 2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception Send msg = 2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52 |
-> See queues’ status:
-> jsa.queue.logs.all-sys-error
queue has 1 message:
-> jsa.queue.logs.any-app-error
queue has 2 messages:
– Run SpringBootRabbitMqConsumer which listen to jsa.queue.logs.all-sys-error
queue with configuration: jsa.rabbitmq.queue=jsa.queue.logs.all-sys-error
:
-> Console’s logs:
Recieved Message: 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]] |
– Run SpringBootRabbitMqConsumer which listen to jsa.queue.logs.any-app-error
queue with configuration: jsa.rabbitmq.queue=jsa.queue.logs.any-app-error
:
-> Console’s logs:
Recieved Message: 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]] Recieved Message: 2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception |