In the tutorial, JavaSampleApproach will show you how to work with RabbitMq Queue Durability and Persistent MessageDelivery.
Related posts:
– RabbitMq – How to create Spring RabbitMq Publish/Subcribe pattern with SpringBoot
Contents
I. Technologies
– Java 8
– Maven 3.6.1
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.7.RELEASE
– RabbitMq
II. RabbitMq Queue Durability and Persistent MessageDelivery
RabbitMq Queue has 2 kind of durability: {Durable, Transient}.
– Durable queue is a solution for re-declared persistent messages during broker restarts if broker is taken down.
Scenarios with above design:
– When sending 2 messages {a persistent message, a trasient message} to exchange X, all messages will be
delivered to 2 queues {Q1, Q2}.
– Stop RabbitMq service, then re-start it -> just a persistent message will be re-declared in durable queue Q1.
With Spring framework, default message is PERSISTENT mode:
package org.springframework.amqp.core; ... public class MessageProperties implements Serializable { ... public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; ... } |
We can change the delivery mode as below:
... Message appErrMsg = MessageBuilder.withBody(appErrorLog.getBytes()) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) .build(); ... |
III. Practice
In the tutorial, we create 2 SpringBoot project as below:
Step to do:
– Create SpringBoot projects
– Implement RabbitMq Producer
– Implement RabbitMq Consumer
– Setup RabbitMq exchange, queues
– Run and check results
1. Create SpringBoot projects
Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp
:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> |
2. Implement RabbitMq Producer
– Open application.properties file, add spring.rabbitmq.*
:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest jsa.rabbitmq.exchange=jsa.exchange.logs |
– Implement Publisher:
package com.javasampleapproach.rabbitmq.publisher; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class Publisher { @Autowired private AmqpTemplate amqpTemplate; @Value("${jsa.rabbitmq.exchange}") private String exchange; public void doPublish(Message msg){ amqpTemplate.send(exchange, "",msg); System.out.println("Send msg = " + new String(msg.getBody())); } } |
– Implement RabbitMq Publisher client:
package com.javasampleapproach.rabbitmq; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.javasampleapproach.rabbitmq.publisher.Publisher; @SpringBootApplication public class SpringRabbitMqPublisherApplication implements CommandLineRunner{ @Autowired Publisher publisher; public static void main(String[] args) { SpringApplication.run(SpringRabbitMqPublisherApplication.class, args); } @Override public void run(String... args) throws Exception { /** * 1 */ String systemErrorLog = "2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52"; Message sysErrMsg = MessageBuilder.withBody(systemErrorLog.getBytes()) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // send to RabbitMQ publisher.doPublish(sysErrMsg); /** * message 2 * * This message will be delivered to one queue: jsa.queue.logs.any-app-error */ String appErrorLog = "2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception"; Message appErrMsg = MessageBuilder.withBody(appErrorLog.getBytes()) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) .build(); // send to RabbitMQ publisher.doPublish(appErrMsg); } } |
3. Implement RabbitMq Consumer
– Open application.properties file, add spring.rabbitmq.*
:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest jsa.rabbitmq.queue=jsa.queue.durable.logs |
– Implement Subcribe:
package com.javasampleapproach.rabbitmq.subcriber; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Subcriber { @RabbitListener(queues="${jsa.rabbitmq.queue}") public void recievedMessage(byte[] log) { System.out.println("Recieved Message: " + new String(log)); } } |
4. Setup RabbitMq exchange, queues
Start RabbitMQ service. Then,
– Add fanout exchange jsa.exchange.logs
:
– Add 2 queues a durable queue jsa.queue.durable.logs
, and a trasient queue jsa.queue.transient.logs
– Binding the queues & exchanges:
5. Run and check results
– Run SpringRabbitMqPublisher with commandline mvn spring-boot:run
,
-> Console’s logs:
Send msg = 2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52 Send msg = 2017-10-10 10:57:51.112 ERROR java.lang.Exception: java.lang.Exception |
– Check status of Queues,
-> Both of them have 2 messages.
– Stop RabbitMq service. Then re-start it,
-> Now remain only one durable queue jsa.queue.durable.logs
which only 1 persistent message:
– Run SpringRabbitMqSubcriber with commandline mvn spring-boot:run
,
-> Console’s logs:
Recieved Message: 2014-03-05 10:58:51.1 INFO 45469 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet Engine: Apache Tomcat/7.0.52 |