ActiveMq – How to work with Spring JMS ActiveMq Topic (Publisher-Subcribers pattern) using SpringBoot

The previous tutorials, We had learned how to create a Spring JMS applications for working with ActiveMq Queue. In the tutorial, JavaSampleApproach will guide you how to create Spring JMS applications for working with ActiveMq Topic.

Related articles:
How to use Spring JMS with ActiveMQ – JMS Consumer and JMS Producer | Spring Boot
Spring Jms ActiveMq – How to send Java object messages to ActiveMQ server (specially with Bi-Directional relationship Java objects)
ActiveMq – Explicitly configure Spring ActiveMq ConnectionFactory with SpringBoot
How to resolve Json Infinite Recursion problem when working with Jackson


I. Spring Jms ActiveMq Topic

ActiveMq provides the Publish-Subscribe pattern (pub-sub) for building Jms message distributed systems.
How it work? -> When you publish a messages, all active subscribers will receive a copy of the message.

spring activeMq - publiser subcriber

With SpringBoot application, we need to enable pubSubDomain (.setPubSubDomain(true)) for 2 beans {JmsTemplate, JmsListenerContainerFactory}:

@Bean
public JmsListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    ...
    factory.setPubSubDomain(true);
    ...
    return factory;
}

@Bean
public JmsTemplate jmsTemplate(){
    JmsTemplate template = new JmsTemplate();
	...
    template.setPubSubDomain(true);
    return template;
}

And set spring.jms.pub-sub-domain=true in application.properties file.

II. Practice

In the tutorial, we use SpringBoot to create 2 applications: PublisherSubcriber.

Technologies
– Java 8
– Maven 3.6.1
– Spring Tool Suite: Version 3.8.4.RELEASE
– Spring Boot: 1.5.4.RELEASE
– Apache ActiveMQ 5.14.0

spring jms activeMq topic- project structure

Step to do:
– Create SpringBoot projects
– Create Java message models
– Configure ConnectionFactory
– Create Jms Publisher/Subcriber
– Implement Client for Publisher
– Run and check results

1. Create SpringBoot projects

Using SpringToolSuite to create 2 SpringBoot projects: one for Publisher, one for Subcriber. Then add dependencies for both of them:


	org.springframework.boot
	spring-boot-starter-activemq


         com.fasterxml.jackson.core
         jackson-databind


	org.json
	json

2. Create Java message models

Create 2 Java message models: Company & Product with one-to-many relationship:

– Company

package com.javasampleapproach.activemq.models;

import java.util.List;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
 
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Company.class)
public class Company {
    private String name;
 
    private List products;
	
    public Company(){
    }
    
    public Company(String name, List products){
    	this.name = name;
    	this.products = products;
    }
    
    // name
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    // products
    public void setProducts(List products){
    	this.products = products;
    }
    
    public List getProducts(){
    	return this.products;
    }
 
	/**
	 * 
	 * Show Detail View
	 */
	public String toString(){
		JSONObject jsonInfo = new JSONObject();
		
		try {
			jsonInfo.put("name", this.name);
 
			JSONArray productArray = new JSONArray();
			if (this.products != null) {
				this.products.forEach(product -> {
					JSONObject subJson = new JSONObject();
					try {
						subJson.put("name", product.getName());
					} catch (JSONException e) {}
					
					productArray.put(subJson);
				});
			}
			jsonInfo.put("products", productArray);
		} catch (JSONException e1) {}
		return jsonInfo.toString();
	}
 
}

– Product

package com.javasampleapproach.activemq.models;
nnotations to handle the
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
 
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class,property="@id", scope = Product.class)
public class Product {
    private String name;
    
    private Company company;
	
    public Product(){
    }
    
    public Product(String name){
    	this.name = name;
    }
    
    public Product(String name, Company company){
    	this.name = name;
    	this.company = company;
    }
    
    // name
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    // products
    public void setCompany(Company company){
    	this.company = company;
    }
    
    public Company getCompany(){
    	return this.company;
    }
}

Note: We use @JsonIdentityInfo annotation to handle the Infinite Recursion problem for serializing Bi-Directional relationship objects with Jackson.

3. Configure ConnectionFactory

We create ConnectionFactoryConfig for both Publisher and Subcriber.

package com.javasampleapproach.activemq.config;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@Configuration
@EnableJms
public class ConnectionFactoryConfig {
	@Value("${jsa.activemq.broker.url}")
	String brokerUrl;
	
	@Value("${jsa.activemq.borker.username}")
	String userName;
	
	@Value("${jsa.activemq.borker.password}")
	String password;

	/*
	 * Initial ConnectionFactory
	 */
    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(brokerUrl);
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }
    
	@Bean // Serialize message content to json using TextMessage
	public MessageConverter jacksonJmsMessageConverter() {
	    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
	    converter.setTargetType(MessageType.TEXT);
	    converter.setTypeIdPropertyName("_type");
	    return converter;
	}
    
	...
}

We use jacksonJmsMessageConverter bean to serialize Java object messages.
>>> See more at: How to send Java object messages to ActiveMQ server.

With Publisher application, we configure additional jmsTemplate bean in ConnectionFactoryConfig file as below code:

@Bean
public JmsTemplate jmsTemplate(){
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(connectionFactory());
    template.setMessageConverter(jacksonJmsMessageConverter());
    template.setPubSubDomain(true);
    return template;
}

With Subcriber application, we configure additional jsaFactory bean in ConnectionFactoryConfig file as below code:

@Bean
public JmsListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true);
    factory.setMessageConverter(jacksonJmsMessageConverter());
    configurer.configure(factory, connectionFactory);
    return factory;
}
4. Create Jms Publisher/Subcriber

With Publisher application, create a JmsPublisher component as below:

package com.javasampleapproach.activemq.jms;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import com.javasampleapproach.activemq.models.Company;

@Component
public class JmsPublisher {
	@Autowired
	JmsTemplate jmsTemplate;
	
	@Value("${jsa.activemq.topic}")
	String topic;
	
	public void send(Company apple){
		jmsTemplate.convertAndSend(topic, apple);
	}
}

With Subcriber application, create a JmsPublisher component as below:

package com.javasampleapproach.activemq.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import com.javasampleapproach.activemq.models.Company;


@Component
public class JmsSubcriber {
	
	@JmsListener(destination = "${jsa.activemq.topic}")
	public void receive(Company msg){
		System.out.println("Recieved Message: " + msg);
	}
}

5. Implement Client for Publisher

In main class SpringActiveMqTopicProducerApplication, we use CommandLineRunner interface to implement code:
– Initial 2 Company object messages {apple, samsung}
– Sending them to ActiveMQ Topic server.

package com.javasampleapproach.activemq;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.javasampleapproach.activemq.jms.JmsPublisher;
import com.javasampleapproach.activemq.models.Company;
import com.javasampleapproach.activemq.models.Product;

@SpringBootApplication
public class SpringActiveMqTopicProducerApplication implements CommandLineRunner {

	@Autowired
	JmsPublisher publisher;
	
	public static void main(String[] args) {
		SpringApplication.run(SpringActiveMqTopicProducerApplication.class, args);
	}
	
	@Override
	public void run(String... args) throws Exception {
		/*
		 * Apple company & products
		 */
		// initial company and products 
		Product iphone7 = new Product("Iphone 7");
		Product iPadPro = new Product("IPadPro");
		
		List appleProducts = new ArrayList(Arrays.asList(iphone7, iPadPro));
		
		Company apple = new Company("Apple", appleProducts);
		
		iphone7.setCompany(apple);
		iPadPro.setCompany(apple);
		
		// send message to ActiveMQ
		publisher.send(apple);
        
        /*
         * Samsung company and products
         */
		Product galaxyS8 = new Product("Galaxy S8");
		Product gearS3 = new Product("Gear S3");
		
		List samsungProducts = new ArrayList(Arrays.asList(galaxyS8, gearS3));
		
		Company samsung = new Company("Samsung", samsungProducts);
		
		galaxyS8.setCompany(samsung);
		gearS3.setCompany(samsung);
		
        /*
         * send message to ActiveMQ
         */
		publisher.send(samsung);
	}
}

6. Run and check results

– Start ActiveMQ server with commandline: C:\apache-activemq-5.14.5>.\bin\activemq start.
– Build and Run the SpringBoot applications by commandlines: {mvn clean install, mvn spring-boot:run} with the following order:

6.1 Enable one active subscriber

– Start an instance of SpringBoot Subcriber on ActiveMQ topic: jsa-topic
– Then start an instance of SpringBoot Publiser.

-> We receive 2 Company messages {apple, samsung}. See the subscriber’s console logs:

Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}
6.2 Enable two active subcribers

Again, we start another instance of SpringBoot Subcriber on ActiveMQ topic: jsa-topic.
-> Now having 2 active subcribers on jsa-topic topic.

Start the SpringBoot Publiser application again for sending messages.

-> Results: we receive 2 new Company messages for each Subcribers.
So with the first Subcriber, it had recieved total 4 Company messages:

Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}
Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}

And with the second Subcriber (the newer), it had recieved 2 Company messages:

Recieved Message: {"name":"Apple","products":[{"name":"Iphone 7"},{"name":"IPadPro"}]}
Recieved Message: {"name":"Samsung","products":[{"name":"Galaxy S8"},{"name":"Gear S3"}]}

Spring Jms Applications (Publisher – Subcribers) are working fine with ActiveMq server! -> Let’s start now!

III. Sourcecode

SpringActiveMqTopicPublisher
SpringActiveMqTopicSubcriber



By grokonez | July 3, 2017.


Related Posts


4 thoughts on “ActiveMq – How to work with Spring JMS ActiveMq Topic (Publisher-Subcribers pattern) using SpringBoot”

  1. You get into trouble when setting configurer.configure(factory, connectionFactory); as it would use the application.properties to set the pub-sub-domain. If spring.jms.pub-sub-domain=false for example, the default configurer will set the pub sub domain back to false. In which case the listener will be unable to consume from the queue.

Got Something To Say:

Your email address will not be published. Required fields are marked *

*