How to start Spring Apache Kafka Application with SpringBoot Auto-Configuration

The Spring Apache Kafka (spring-kafka) provides a high-level abstraction for Kafka-based messaging solutions. And Spring Boot 1.5 includes auto-configuration support for Apache Kafka via the spring-kafka project. So in the tutorial, JavaSampleApproach will show you how to start Spring Apache Kafka Application with SpringBoot.

Related Articles:
How to start Apache Kafka
How to start Spring Kafka Application with Spring Boot
Spring JMS – Explicitly Configure Spring ActiveMQ ConnectionFactory | SpringBoot

I. Technologies

– Java 8
– Maven build
– Spring Boot
– Apache Kafka
– Spring Tool Suite editor

II. Overview

As above mention, SpringBoot supports auto-configuration for Apache Kafka development:
– Use spring.kafka.* in application.properties file to modify external configuration.


spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=jsa-group

Spring Apache Kafka Application

– Use Spring auto-configured KafkaTemplate to send Kafka-based messages:


@Autowired
private KafkaTemplate kafkaTemplate;

public void send(String data) {
    ...   
    kafkaTemplate.send(kafkaTopic, data);
}

– Use @KafkaListener to setup a Kafka listener:


@Component
public class KafkaConsumer {
    ...
	
	@KafkaListener(topics="${jsa.kafka.topic}")
    public void processMessage(String content) {
		...
    }
}

III. Practice

We create a SpringBoot project with 2 main services: KafkaProducer and KafkaConsumer for sending and receiving messages from Apache Kafka cluster.
And export 2 RestAPIs {‘/producer’, ‘/consumer’} for interaction.

SpringBoot Kafka Application - project structure

Step to do:
– Create a SpringBoot project
– Create Kafka Producer and Consumer
– Add Apache Kafka external configuration
– Export some RestAPIs
– Deployment

1. Create a SpringBoot project

Use SpringToolSuite to create a SpringBoot project, then add dependencies {spring-kafka, spring-boot-starter-web}:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. Create Kafka Producer and Consumer

– Create a KafkaProducer service:


package com.javasampleapproach.apachekafka.services;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
	private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
	
	@Autowired
	private KafkaTemplate kafkaTemplate;
	
	@Value("${jsa.kafka.topic}")
	String kafkaTopic = "jsa-test";
	
	public void send(String data) {
	    log.info("sending data='{}'", data);
	    
	    kafkaTemplate.send(kafkaTopic, data);
	}
}

– Create a KafkaConsumer service:


package com.javasampleapproach.apachekafka.services;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.javasampleapproach.apachekafka.storage.MessageStorage;

@Component
public class KafkaConsumer {
	private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);

	@Autowired
	MessageStorage storage;
	
	@KafkaListener(topics="${jsa.kafka.topic}")
    public void processMessage(String content) {
		log.info("received content = '{}'", content);
		storage.put(content);
    }
}

About MessageStorage, it is an additional implement to store Kafka-based messages after received. See details the implementation of MessageStorage:


package com.javasampleapproach.apachekafka.storage;

import java.util.ArrayList;
import java.util.List;

import org.springframework.stereotype.Component;

@Component
public class MessageStorage {
	
	private List storage = new ArrayList();
	
	public void put(String message){
		storage.add(message);
	}
	
	public String toString(){
		StringBuffer info = new StringBuffer();
		storage.forEach(msg->info.append(msg).append("
")); return info.toString(); } public void clear(){ storage.clear(); } }

3. Add Apache Kafka external configuration

Open application.properties file, add the configurations:


spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=jsa-group
jsa.kafka.topic=jsa-springboot-topic

spring.kafka.bootstrap-servers is used to indicate the Kafka Cluster address.
spring.kafka.consumer.group-id is used to indicate the consumer-group-id.
jsa.kafka.topic is an additional configuration. In the the tutorial, we use jsa.kafka.topic to define a Kafka topic name to produce and receive messages.

4. Export some RestAPIs

Create a Web Controller to export 2 RestAPIs {‘/producer’, ‘/consumer’}


package com.javasampleapproach.apachekafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.javasampleapproach.apachekafka.services.KafkaProducer;
import com.javasampleapproach.apachekafka.storage.MessageStorage;

@RestController
@RequestMapping(value="/jsa/kafka")
public class WebRestController {
	
	@Autowired
	KafkaProducer producer;
	
	@Autowired
	MessageStorage storage;
	
	@GetMapping(value="/producer")
	public String producer(@RequestParam("data")String data){
		producer.send(data);
		
		return "Done";
	}
	
	@GetMapping(value="/consumer")
	public String getAllRecievedMessage(){
		String messages = storage.toString();
		storage.clear();
		
		return messages;
	}
}

/producer is used to send messages from browser to KafkaProducer service.
/consumer is used to get all recieved messages that are buffered in MessageStorage.

5. Deployment

Start Apache Kafka Cluster:
– Start a ZooKeeper:


C:\kafka_2.12-0.10.2.1>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

– Start the Apache Kafka server:


.\bin\windows\kafka-server-start.bat .\config\server.properties

>>> More details at: How to start Apache Kafka

Build and Install the SpringBoot project with commandlines: mvn clean install and mvn spring-boot:run

– Make a producer request: http://localhost:8080/jsa/kafka/producer?data=Hello World
-> Logs:


...

2017-06-08 13:49:47.111  INFO 12240 --- [io-8080-exec-10] c.j.apachekafka.services.KafkaProducer   : sending data='Hello World'
...
2017-06-08 13:49:47.248  INFO 12240 --- [ntainer#0-0-L-1] c.j.apachekafka.services.KafkaProducer   : received content = 'Hello World'

– Make another producer request: http://localhost:8080/jsa/kafka/producer?data=This is a SpringBoot Kafka Application

-> Logs:


2017-06-08 13:51:34.909  INFO 12240 --- [nio-8080-exec-7] c.j.apachekafka.services.KafkaProducer   : sending data='This is a SpringBoot
Kafka Application'
2017-06-08 13:51:34.913  INFO 12240 --- [ntainer#0-0-L-1] c.j.apachekafka.services.KafkaProducer   : received content = 'This is a Sprin
gBoot Kafka Application'

– Make a consumer request: http://localhost:8080/jsa/kafka/consumer, result:

Spring Appache Kafka - consumer request

IV. Sourcecode

SpringBootApacheKafka



By grokonez | June 8, 2017.

Last updated on April 18, 2021.



Related Posts


5 thoughts on “How to start Spring Apache Kafka Application with SpringBoot Auto-Configuration”

  1. Hi, i am getting error,
    Failed to execute goal org.springframework.boot:spring-boot-maven-plugin:1.5.1.RELEASE:run (default-cli) on project Messaging: Unable to find a suitable main class, please add a ‘mainClass’ property -> [Help 1]

  2. spring -boot version is old here in the pom..also the jars need to be updated….does anyone have an updated pom on this project that works??
    I updated the spring boot version to
    <!– 1.5.4.RELEASE –>
    2.0.2.RELEASE

    compiles alirght but gave test failure
    [INFO]
    [INFO] ——————————————————-
    [INFO] T E S T S
    [INFO] ——————————————————-
    [INFO] Running com.javasampleapproach.apachekafka.SpringApacheKafkaApplicationTests
    [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.042 s <<< FAILURE! – in com.javasampleapproach.apachekafka.SpringApacheKafkaApplicationTests
    [ERROR] initializationError(com.javasampleapproach.apachekafka.SpringApacheKafkaApplicationTests) Time elapsed: 0.014 s <<< ERROR!
    java.lang.TypeNotPresentException: Type org.springframework.test.context.junit4.SpringRunner not present
    Caused by: java.lang.ClassNotFoundException: org.springframework.test.context.junit4.SpringRunner

  3. How would it scale ?, it would run out of memory in about 15 mins ? Reading a message queue into an object and then reading the object isn’t going to be in any fault tolerant.

    You would be better with Kafka Streams to achieve this, or at least cache them in a h2 database so its a common resource for all users.

Got Something To Say:

Your email address will not be published. Required fields are marked *

*