SpringBoot RabbitMq Headers Exchange

In the past post, we had introduced about RabbitMQ Topic Exchange. Today, JavaSampleApproach will show you how to work with SpringBoot RabbitMQ Headers Exchange.

Related posts:
SpringBoot RabbitMQ Topic Exchange
RabbitMQ – How to create Spring RabbitMQ Producer/Consumer applications with SpringBoot
RabbitMq – How to create Spring RabbitMq Publish/Subcribe pattern with SpringBoot
RabbitMQ – How to send/receive Java object messages with Spring RabbitMq | SpringBoot
SpringBoot RabbitMq Exchange to Exchange Topology

I. Technologies

– Java 8
– Maven 3.6.1
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.7.RELEASE
– RabbitMQ

II. RabbitMq Headers Exchange

Headers exchange routes message based on header values instead of routing keys.
A special argument named x-match has 2 values {all, any} where all is the default value of a headers binding.
x-match = all means that all the values must match.
x-match = any means just one matching header value is sufficient.

springboot rabbitmq exchage headers - architecture design

Scenarios with above design:
– When sending a message with headers: {layer=system, level=error}, the message will be delivered to 2 queues {Q1, Q2}.
– When sending a message with headers: {layer=application, level=error}, the message will be delivered to one queue Q2.
– When sending a message with headers: {layer=system, level=info}, the message will be discarded.

III. Practices

In the tutorial, we create 2 SpringBoot project as below:

springboot rabbitmq exchage headers - project structures

Step to do:
– Create SpringBoot projects
– Implement RabbitMq producer
– Implement RabbitMq consumer
– Run and check results

1. Create SpringBoot projects

Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Implement RabbitMq producer

2.1 Configure RabbitMq connection

Open application.properties file, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.exchange=jsa.exchange.logs.headers

2.2 Implement Producer


package com.javasampleapproach.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class Producer {
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	@Value("${jsa.rabbitmq.exchange}")
	private String exchange;
	
	public void produce(Message message){
		
		/**
		 * void send(String exchange, String routingKey, Message message) throws AmqpException;
		 * routingKey is NOT need so we set it to an empty String.
		 * 
		 */
		amqpTemplate.send(exchange, "", message);
		System.out.println("Send msg = " + new String(message.getBody()));
	}
}

2.3 Implement Producer client


package com.javasampleapproach.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.javasampleapproach.rabbitmq.producer.Producer;


@SpringBootApplication
public class SpringRabbitMqProducerApplication  implements CommandLineRunner{

	public static void main(String[] args) {
		SpringApplication.run(SpringRabbitMqProducerApplication.class, args);
	}
	
	@Autowired
	Producer producer;

	@Override
	public void run(String... args) throws Exception {
		
		/**
		 *  message 1
		 *  
		 *  This message will be delivered to 2 queues
		 *  {
		 *  	jsa.queue.logs.all-sys-error
		 *  	jsa.queue.logs.any-app-error
		 *  }
		 */
		String systemErrorLog = "2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]";
		Message sysErrMsg = MessageBuilder.withBody(systemErrorLog.getBytes())
											.setHeader("layer", "system")
											.setHeader("level", "error")
											.build();
		producer.produce(sysErrMsg);
		
		/**
		 *  message 2
		 *  
		 *  This message will be delivered to one queue: jsa.queue.logs.any-app-error
		 */
		String appErrorLog = "2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception";
		Message appErrMsg = MessageBuilder.withBody(appErrorLog.getBytes())
											.setHeader("layer", "application")
											.setHeader("level", "error")
											.build();
		producer.produce(appErrMsg);
		
		/**
		 *  message 3
		 *  
		 *  => This message will be discarded because it does NOT match with critical conditions of 2 queues 
		 *  {
		 *  	jsa.queue.logs.all-sys-error
		 *  	jsa.queue.logs.any-app-error
		 *  }
		 */
		String sysInfoLog = "2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52";
		Message sysInfoMsg = MessageBuilder.withBody(sysInfoLog.getBytes())
											.setHeader("layer", "system")
											.setHeader("level", "info")
											.build();
		producer.produce(sysInfoMsg);
	}
}

3. Implement RabbitMq consumer

3.1 Configure RabbitMq connection

Open application.properties file, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.queue.logs.all-sys-error
#jsa.rabbitmq.queue=jsa.queue.logs.any-app-error

3.2 Implement Consumer


package com.javasampleapproach.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {
	
	@RabbitListener(queues="${jsa.rabbitmq.queue}")
    public void recievedMessage(byte[] log) {
        System.out.println("Recieved Message: " + new String(log));
    }
}

4. Run and check results

4.1 Setup RabbitMq exchange, queues

Enable rabbitmq_management by cmd: rabbitmq-plugins enable rabbitmq_management --online
Then go to: http://localhost:15672 -> login with user/password: guest/guest.

Add exchange:
Go to http://localhost:15672/#/exchanges, add exchange: jsa.exchange.logs.headers

springboot rabbitmq exchage headers - create exchange

Add queue:
Go to http://localhost:15672/#/queues, add 2 queues: jsa.queue.logs.all-sys-error, jsa.queue.logs.any-app-error.

springboot rabbitmq exchage headers - create queues

– Binding the queues with above exchange:

springboot rabbitmq exchage headers - binding queues with exchange

4.2 Run & check results

– Run SpringBootRabbitMqProducer with commandline mvn spring-boot:run,

-> Console’s logs:


Send msg = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]
Send msg = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception
Send msg = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52

-> See queues’ status:

springboot rabbitmq exchage headers - after sending messages

-> jsa.queue.logs.all-sys-error queue has 1 message:

get messages of jsa.queue.logs.all-sys-error

-> jsa.queue.logs.any-app-error queue has 2 messages:

get messages of jsa.queue.logs.any-app-error

– Run SpringBootRabbitMqConsumer which listen to jsa.queue.logs.all-sys-error queue with configuration: jsa.rabbitmq.queue=jsa.queue.logs.all-sys-error:

-> Console’s logs:


Recieved Message: 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]

– Run SpringBootRabbitMqConsumer which listen to jsa.queue.logs.any-app-error queue with configuration: jsa.rabbitmq.queue=jsa.queue.logs.any-app-error:

-> Console’s logs:


Recieved Message: 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]
Recieved Message: 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception

IV. Sourcecode

SpringRabbitMqProducer
SpringRabbitMqConsumer



By grokonez | October 11, 2017.

Last updated on April 25, 2021.



Related Posts


Got Something To Say:

Your email address will not be published. Required fields are marked *

*