RabbitMq Queue Durability and Persistent MessageDelivery | SpringBoot

In the tutorial, JavaSampleApproach will show you how to work with RabbitMq Queue Durability and Persistent MessageDelivery.

Related posts:
RabbitMq – How to create Spring RabbitMq Publish/Subcribe pattern with SpringBoot

I. Technologies

– Java 8
– Maven 3.6.1
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.7.RELEASE
– RabbitMq

II. RabbitMq Queue Durability and Persistent MessageDelivery

RabbitMq Queue has 2 kind of durability: {Durable, Transient}.
Durable queue is a solution for re-declared persistent messages during broker restarts if broker is taken down.

springboot rabbitmq durable queue - architecture

Scenarios with above design:
– When sending 2 messages {a persistent message, a trasient message} to exchange X, all messages will be
delivered to 2 queues {Q1, Q2}.
– Stop RabbitMq service, then re-start it -> just a persistent message will be re-declared in durable queue Q1.

With Spring framework, default message is PERSISTENT mode:


package org.springframework.amqp.core;
...

public class MessageProperties implements Serializable {
	...
	
	public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
	...
	
}

We can change the delivery mode as below:


...

Message appErrMsg = MessageBuilder.withBody(appErrorLog.getBytes())
									.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
									.build();
...

III. Practice

In the tutorial, we create 2 SpringBoot project as below:

springboot rabbitmq exchage headers - project structures

Step to do:
– Create SpringBoot projects
– Implement RabbitMq Producer
– Implement RabbitMq Consumer
– Setup RabbitMq exchange, queues
– Run and check results

1. Create SpringBoot projects

Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Implement RabbitMq Producer

– Open application.properties file, add spring.rabbitmq.*:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.exchange=jsa.exchange.logs

– Implement Publisher:


package com.javasampleapproach.rabbitmq.publisher;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Publisher {
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	@Value("${jsa.rabbitmq.exchange}")
	private String exchange;
	
	public void doPublish(Message msg){
		amqpTemplate.send(exchange, "",msg);
		System.out.println("Send msg = " + new String(msg.getBody()));
	}
}

– Implement RabbitMq Publisher client:


package com.javasampleapproach.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.javasampleapproach.rabbitmq.publisher.Publisher;

@SpringBootApplication
public class SpringRabbitMqPublisherApplication implements CommandLineRunner{

	@Autowired
	Publisher publisher;
	
	public static void main(String[] args) {
		SpringApplication.run(SpringRabbitMqPublisherApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		/**
		 *  1
		 */
		String systemErrorLog = "2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52";
		
		Message sysErrMsg = MessageBuilder.withBody(systemErrorLog.getBytes())
											.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
											.build();
		
		// send to RabbitMQ
		publisher.doPublish(sysErrMsg);
		

		/**
		 *  message 2
		 *  
		 *  This message will be delivered to one queue: jsa.queue.logs.any-app-error
		 */
		String appErrorLog = "2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception";
		
		Message appErrMsg = MessageBuilder.withBody(appErrorLog.getBytes())
											.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
											.build();
		
		// send to RabbitMQ
		publisher.doPublish(appErrMsg);
	}
}

3. Implement RabbitMq Consumer

– Open application.properties file, add spring.rabbitmq.*:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.queue.durable.logs

– Implement Subcribe:


package com.javasampleapproach.rabbitmq.subcriber;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Subcriber {

	@RabbitListener(queues="${jsa.rabbitmq.queue}")
    public void recievedMessage(byte[] log) {
		System.out.println("Recieved Message: " + new String(log));
    }
}

4. Setup RabbitMq exchange, queues

Start RabbitMQ service. Then,

– Add fanout exchange jsa.exchange.logs:

springboot rabbitmq durable queue - create exchange

– Add 2 queues a durable queue jsa.queue.durable.logs, and a trasient queue jsa.queue.transient.logs

springboot rabbitmq durable queue - create durable queue

springboot rabbitmq durable queue - create transient queue

– Binding the queues & exchanges:

springboot rabbitmq durable queue - binding queue with exchange

5. Run and check results

– Run SpringRabbitMqPublisher with commandline mvn spring-boot:run,

-> Console’s logs:


Send msg = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52
Send msg = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception

– Check status of Queues,

-> Both of them have 2 messages.

springboot rabbitmq durable queue - message in queues after sending

– Stop RabbitMq service. Then re-start it,

-> Now remain only one durable queue jsa.queue.durable.logs which only 1 persistent message:

springboot rabbitmq exchage headers - remain 1 queue

springboot rabbitmq exchage headers - remain 1 message

– Run SpringRabbitMqSubcriber with commandline mvn spring-boot:run,

-> Console’s logs:


Recieved Message: 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52

IV. Sourcecode

SpringRabbitMqSubcriber
SpringRabbitMqPublisher



By grokonez | October 16, 2017.

Last updated on May 6, 2021.



Related Posts


Got Something To Say:

Your email address will not be published. Required fields are marked *

*