Spring Data Redis Messaging – PubSub example | Spring Boot

spring-data-spring-jpa-redis-messaging-pubsub-spring-boot-spring-data-redis-example-feature-image

The previous post introduces way to do Redis CRUD Operations. In this tutorial, we’re gonna look at how to implement Redis Messaging- PubSub with Spring Boot.

Related Post: Spring Data Redis CRUD Operations example with Spring Boot

I. Technology

– Java 1.8
– Maven 3.3.9
– Spring Tool Suite – Version 3.9.0.RELEASE
– Spring Boot: 1.5.9.RELEASE

II. Spring Data Redis Messaging – PubSub

1. Overview

Spring Data provides dedicated messaging integration for Redis that can be roughly divided into two areas of functionality: production (or publication) and consumption (or subscription) of messages.

– Publisher doesn’t know specific subscribers to send messages to. Instead, Publisher specifies channel.
– Subscriber receives messages from one or more topics, without knowledge of Publisher.

2. Maven Dependency

	org.springframework.boot
	spring-boot-starter-data-redis

3. Redis Configuration

We need:
– connection factory -> JedisConnectionFactory
– a channel -> ChannelTopic
– a template for publisher to publish messages -> RedisTemplate
– a message listener for consumer to consume messages -> RedisMessageListenerContainer

@Bean
JedisConnectionFactory jedisConnectionFactory() {
	return new JedisConnectionFactory();
}

@Bean
public RedisTemplate redisTemplate() {
	final RedisTemplate template = new RedisTemplate();
	template.setConnectionFactory(jedisConnectionFactory());
	template.setValueSerializer(new GenericToStringSerializer(Object.class));
	return template;
}

@Bean
MessageListenerAdapter messageListener() {
	return new MessageListenerAdapter(new CustomerInfoSubscriber());
}

@Bean
RedisMessageListenerContainer redisContainer() {
	final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
	container.setConnectionFactory(jedisConnectionFactory());
	container.addMessageListener(messageListener(), topic());
	container.setTaskExecutor(Executors.newFixedThreadPool(4));
	return container;
}

@Bean
CustomerInfoPublisher redisPublisher() {
	return new RedisCustomerInfoPublisher(redisTemplate(), topic());
}

@Bean
ChannelTopic topic() {
	return new ChannelTopic("pubsub:jsa-channel");
}

We can also supply an executor within RedisMessageListenerContainer for asynch manner of messages.

4. Publish Message

We use convertAndSend() method of the redisTemplate to format and publish message to topic.

 
RedisTemplate redisTemplate;
ChannelTopic topic;

public void publish() {
	redisTemplate.convertAndSend(topic.getTopic(), message);
}

When a message is published, it goes to all the subscribers.

5. Subscribe to Message

We implement the Spring Data Redis-provided MessageListener interface and handle received Message in onMessage() method:

 
class CustomerInfoSubscriber implements MessageListener {

	@Override
	public void onMessage(Message message, byte[] pattern) {
		// handle message...
	}
}

III. Practice

1. Project Structure

spring-data-redis-pubsub-structure

2. Step by Step
1. Create Spring Boot project & add Dependencies

	org.springframework.boot
	spring-boot-starter-data-redis



	org.springframework.boot
	spring-boot-starter-web

2. Create DataModel Class

Under package model, create Customer class:

package com.javasampleapproach.redis.pubsub.model;

import java.io.Serializable;

public class Customer implements Serializable {

	private static final long serialVersionUID = 1L;

	private long id;
	private String firstName;
	private String lastName;

	public Customer() {
	}

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public String getFirstName() {
		return firstName;
	}

	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}

	public String getLastName() {
		return lastName;
	}

	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	public Customer(long id, String firstName, String lastName) {
		this.id = id;
		this.firstName = firstName;
		this.lastName = lastName;
	}

	@Override
	public String toString() {
		return String.format("Customer[id=%d, firstName='%s', lastName='%s']", id, firstName, lastName);
	}
}
3. Create Publisher

Under package producer:
CustomerInfoPublisher interface:

package com.javasampleapproach.redis.pubsub.producer;

public interface CustomerInfoPublisher {
	
	void publish();
}

– And its implementation:

package com.javasampleapproach.redis.pubsub.producer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

import com.javasampleapproach.redis.pubsub.model.Customer;

public class RedisCustomerInfoPublisher implements CustomerInfoPublisher {

	List customers = new ArrayList<>(Arrays.asList(new Customer(1, "Jack", "Smith"),
			new Customer(2, "Adam", "Johnson"), new Customer(3, "Kim", "Smith"), new Customer(4, "David", "Williams"),
			new Customer(5, "Peter", "Davis")));

	private final AtomicInteger counter = new AtomicInteger(0);

	@Autowired
	private RedisTemplate redisTemplate;

	@Autowired
	private ChannelTopic topic;

	public RedisCustomerInfoPublisher() {
	}

	public RedisCustomerInfoPublisher(RedisTemplate redisTemplate, ChannelTopic topic) {
		this.redisTemplate = redisTemplate;
		this.topic = topic;
	}

	@Override
	public void publish() {
		Customer customer = customers.get(counter.getAndIncrement());
		System.out.println(
				"Publishing... customer with id=" + customer.getId() + ", " + Thread.currentThread().getName());

		redisTemplate.convertAndSend(topic.getTopic(), customer.toString());
	}

}
4. Create Subscriber

Under package consumer:

package com.javasampleapproach.redis.pubsub.consumer;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

public class CustomerInfoSubscriber implements MessageListener {

	@Override
	public void onMessage(Message message, byte[] pattern) {
		System.out.println("Received >> " + message +  ", " + Thread.currentThread().getName() );
	}
}
5. Configure Spring Data Redis

Under package config, create RedisConfig class:

package com.javasampleapproach.redis.pubsub.config;

import java.util.concurrent.Executors;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericToStringSerializer;

import com.javasampleapproach.redis.pubsub.consumer.CustomerInfoSubscriber;
import com.javasampleapproach.redis.pubsub.producer.CustomerInfoPublisher;
import com.javasampleapproach.redis.pubsub.producer.RedisCustomerInfoPublisher;

@Configuration
@ComponentScan("com.javasampleapproach.redis.pubsub")
public class RedisConfig {

	@Bean
	JedisConnectionFactory jedisConnectionFactory() {
		return new JedisConnectionFactory();
	}

	@Bean
	public RedisTemplate redisTemplate() {
		final RedisTemplate template = new RedisTemplate();
		template.setConnectionFactory(jedisConnectionFactory());
		template.setValueSerializer(new GenericToStringSerializer(Object.class));
		return template;
	}

	@Bean
	MessageListenerAdapter messageListener() {
		return new MessageListenerAdapter(new CustomerInfoSubscriber());
	}

	@Bean
	RedisMessageListenerContainer redisContainer() {
		final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		container.setConnectionFactory(jedisConnectionFactory());
		container.addMessageListener(messageListener(), topic());
		container.setTaskExecutor(Executors.newFixedThreadPool(4));
		return container;
	}

	@Bean
	CustomerInfoPublisher redisPublisher() {
		return new RedisCustomerInfoPublisher(redisTemplate(), topic());
	}

	@Bean
	ChannelTopic topic() {
		return new ChannelTopic("pubsub:jsa-channel");
	}
}
6. Application Class
package com.javasampleapproach.redis.pubsub;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.javasampleapproach.redis.pubsub.producer.CustomerInfoPublisher;

@SpringBootApplication
public class SpringDataRedisPubSubApplication implements CommandLineRunner {

	@Autowired
	private CustomerInfoPublisher redisPublisher;

	public static void main(String[] args) {
		SpringApplication.run(SpringDataRedisPubSubApplication.class, args);
	}

	@Override
	public void run(String... arg0) throws Exception {
		redisPublisher.publish();
		redisPublisher.publish();
		redisPublisher.publish();
		Thread.sleep(50);
		redisPublisher.publish();
		redisPublisher.publish();
	}
}
7. Run Spring Boot Application & Check Result

– Config maven build:
clean install
– Run project with mode Spring Boot App
– Check results:

Publishing... customer with id=1, main
Publishing... customer with id=2, main
Publishing... customer with id=3, main
Received >> Customer[id=1, firstName='Jack', lastName='Smith'], pool-1-thread-2
Received >> Customer[id=2, firstName='Adam', lastName='Johnson'], pool-1-thread-3
Received >> Customer[id=3, firstName='Kim', lastName='Smith'], pool-1-thread-4
Publishing... customer with id=4, main
Publishing... customer with id=5, main
Received >> Customer[id=4, firstName='David', lastName='Williams'], pool-1-thread-2
Received >> Customer[id=5, firstName='Peter', lastName='Davis'], pool-1-thread-2

IV. Source Code

SpringDataRedisPubSub



By grokonez | January 3, 2018.

Last updated on September 4, 2018.



Related Posts


2 thoughts on “Spring Data Redis Messaging – PubSub example | Spring Boot”

  1. Hi Team,

    Need help asap.

    Caused by: org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

    Many Thanks
    Sushil

Got Something To Say:

Your email address will not be published. Required fields are marked *

*