Spring Batch Partition for Scaling & Parallel Processing

For Scaling & Parallel Processing, Spring Batch provides various solutions: Multi-threaded Step, Parallel Steps, Remote Chunking of Step & Partitioning a Step. In the tutorial, JavaSampleApproach will introduce Partitioning a Step cleary by a sample project.

Related articles:
Spring Batch Job with Parallel Steps
How to use Spring Batch Late Binding – Step Scope & Job Scope


I. SPRING BATCH PARTITION

Spring Batch provides an solution for partitioning a Step execution by remotely or easily configuration for local processing.

How it work?

Partitional

The Job in left hand is executed sequentially, Master step is partitioning step that has some Slave steps. Slave steps can be remote services or local threads.

For configuring partitioning step, Spring Batch provies PartitionHandler component & Partitioner interface.

1. PartitionHandler

The componenet PartitionHandler knows about the kind of remote services(RMI remoting, EJB remoting,… or local threads) or grid numbers. PartitionHandler can send StepExecution requests to the remote Steps, in various format, like a DTO.

How to configure?

<!-- partitioner job -->
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">

	<!-- master step, 5 threads (grid-size) -->
        ...
	<step id="masterStep">
		<partition step="slave" partitioner="partitioner">
			<handler grid-size="5" task-executor="taskExecutor" />
		</partition>
	</step>
        ...
</job>
...
<beans:bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
...

The gridSize defines the number of step executions, so we should to consider the size of TaskExecutor’s thread pool.

2. Partitioner

The Partitioner interface is used to build execution contexts as input parameters for step executions.


public interface Partitioner {
	Map partition(int gridSize);
}

Map contains a unique name for each step execution that associated with ExecutionContext’s value.

3. How to Binding input Map to Steps

StepScope feature of Spring Batch can help us to late binding data from PartitionHandler to each step at runtime.
See more: How to use Spring Batch Late Binding – Step Scope & Job Scope

...
<beans:bean id="itemReader"
	class="com.javasampleapproach.partitioning.step.Reader"
	factory-method="reader" scope="step">
	<beans:constructor-arg value="#{stepExecutionContext[filename]}" />
</beans:bean>

<beans:bean id="itemProcessor"
	class="com.javasampleapproach.partitioning.step.Processor" scope="step">
	<beans:property name="threadName" value="#{stepExecutionContext[name]}" />
</beans:bean>
...

II. PRACTICE

In the tutorial, we create a Batch Job that has only one partition step with 5 slave steps for inserting data from 5 csv files to MySql database.
Spring batch Partition - overview

Technologies

– Java 1.8
– Maven 3.3.9
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.5.1.RELEASE
– MySQL Database 1.4

Step to do

– Create Spring Boot project
– Create a simple model
– Create DAO class
– Create Batch Job Step

– Create Batch Job Partitioner
– Configure Partitional Batch Job

– Create JobLaunchController
– Create 5 csv files
– Run & Check results

1. Create Spring Boot project

Create a Spring Boot project with needed dependencies:
– spring-boot-starter-batch
– spring-boot-starter-web
– mysql-connector-java

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
   
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>

2. Create a simple model


package com.javasampleapproach.partitioning.model;

public class Customer {
	private int id;
	private String firstName;
	private String lastName;

	public Customer() {
	}

	public Customer(int id, String firstName, String lastName) {
		this.id = id;
		this.firstName = firstName;
		this.lastName = lastName;
	}

	public long getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getFirstName() {
		return firstName;
	}

	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}

	public String getLastName() {
		return lastName;
	}

	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	@Override
	public String toString() {
		return String.format("Customer[id=%d , firstName='%s', lastName='%s']", id, firstName, lastName);
	}
}

3. Create DAO class


package com.javasampleapproach.partitioning.dao;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

import javax.annotation.PostConstruct;
import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import org.springframework.stereotype.Repository;

import com.javasampleapproach.partitioning.dao.CustomerDao;
import com.javasampleapproach.partitioning.model.Customer;


@Repository
public class CustomerDao extends JdbcDaoSupport{

	@Autowired
	DataSource dataSource;

	@PostConstruct
	private void initialize() {
		setDataSource(dataSource);
	}

	public void insert(List Customers) {
		String sql = "INSERT INTO customer " + "(id, firstname, lastname) VALUES (?, ?, ?)";
		getJdbcTemplate().batchUpdate(sql, new BatchPreparedStatementSetter() {
			public void setValues(PreparedStatement ps, int i) throws SQLException {
				Customer customer = Customers.get(i);
				ps.setLong(1, customer.getId());
				ps.setString(2, customer.getFirstName());
				ps.setString(3, customer.getLastName());
			}

			public int getBatchSize() {
				return Customers.size();
			}
		});

	}
}

4. Create Batch Job Step

– Create Reader.java:


package com.javasampleapproach.partitioning.step;

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

import com.javasampleapproach.partitioning.model.Customer;


public class Reader{
	
	public static FlatFileItemReader reader(String path) {
		
		FlatFileItemReader reader = new FlatFileItemReader();

		reader.setResource(new ClassPathResource(path));
		reader.setLineMapper(new DefaultLineMapper() {
			{
				setLineTokenizer(new DelimitedLineTokenizer() {
					{
						setNames(new String[] { "id", "firstName", "lastName" });
					}
				});
				setFieldSetMapper(new BeanWrapperFieldSetMapper() {
					{
						setTargetType(Customer.class);
					}
				});
			}
		});
		return reader;
	}
}

– Create Writer.java:


package com.javasampleapproach.partitioning.step;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.javasampleapproach.partitioning.dao.CustomerDao;
import com.javasampleapproach.partitioning.model.Customer;


public class Writer implements ItemWriter {

	private final CustomerDao customerDao;

	public Writer(CustomerDao customerDao) {
		this.customerDao = customerDao;
	}

	@Override
	public void write(List customers) throws Exception {
		customerDao.insert(customers);
	}

}

– Create Processor.java:


package com.javasampleapproach.partitioning.step;

import java.util.Random;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

import com.javasampleapproach.partitioning.model.Customer;


public class Processor implements ItemProcessor {

	private static final Logger log = LoggerFactory.getLogger(Processor.class);

	private String threadName;
	
	@Override
	public Customer process(Customer customer) throws Exception {
		Random r = new Random();
		
		final String firstName = customer.getFirstName().toUpperCase();
		final String lastName = customer.getLastName().toUpperCase();
		final Customer fixedCustomer = new Customer(r.nextInt(), firstName, lastName);

		log.info(threadName + " processing : "+ "Converting (" + customer + ") into (" + fixedCustomer + ")");
		
		return fixedCustomer;
	}

	public String getThreadName() {
		return threadName;
	}

	public void setThreadName(String threadName) {
		this.threadName = threadName;
	}
}

5. Create Batch Job Partitioner


package com.javasampleapproach.partitioning.partitioner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

public class SamplePartitioner implements Partitioner{

	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	List fileNameList = Arrays.asList("customer-data-1.csv", "customer-data-2.csv", 
			      "customer-data-3.csv", "customer-data-4.csv", "customer-data-5.csv");
	
	
	
	@Override
	public Map partition(int gridSize) {
		
		Map partitionData = new HashMap();
		
		//
		// In the tutorial: gridSize = 5 for 5 Threads to read 5 files in fileNameList:
		// {"customer-data-1.csv", "customer-data-2.csv", "customer-data-3.csv", "customer-data-4.csv", "customer-data-5.csv"}
		//
		
		for (int i = 0; i < gridSize; i++) {
			ExecutionContext executionContext = new ExecutionContext();
			logger.info("Starting : Thread" + i);
			logger.info("File Name: " + fileNameList.get(i));
			
			// give fileName for ExecutionContext
			executionContext.putString("filename", fileNameList.get(i));
			// give a thread name for ExecutionContext
			executionContext.putString("name", "Thread" + i);
			
			partitionData.put("partition: " + i, executionContext);
		}
		
		return partitionData;
	}

}

6. Configure Partitional Batch Job

- Create a batchjob.xml configuration file:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
	xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">

	<!-- partitioner job -->
	<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">

		<!-- master step, 5 threads (grid-size) -->
		<!-- In the tutorial: gridSize = 5 for 5 Threads to read 5 files in fileNameList: 
			{"customer-data-1.csv", "customer-data-2.csv", "customer-data-3.csv", "customer-data-4.csv", 
			"customer-data-5.csv"} -->
		<step id="masterStep">
			<partition step="slave" partitioner="partitioner">
				<handler grid-size="5" task-executor="taskExecutor" />
			</partition>
		</step>

	</job>

	<!-- each thread will run this job, with different stepExecutionContext 
		values. -->
	<step id="slave" xmlns="http://www.springframework.org/schema/batch">
		<tasklet>
			<chunk reader="itemReader" processor="itemProcessor" writer="itemWriter"
				commit-interval="1" />
		</tasklet>
	</step>

	<beans:bean id="partitioner"
		class="com.javasampleapproach.partitioning.partitioner.SamplePartitioner" />

	<beans:bean id="taskExecutor"
		class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

	<beans:bean id="itemReader"
		class="com.javasampleapproach.partitioning.step.Reader"
		factory-method="reader" scope="step">
		<beans:constructor-arg value="#{stepExecutionContext[filename]}" />
	</beans:bean>

	<beans:bean id="itemProcessor"
		class="com.javasampleapproach.partitioning.step.Processor" scope="step">
		<beans:property name="threadName" value="#{stepExecutionContext[name]}" />
	</beans:bean>

	<beans:bean id="itemWriter"
		class="com.javasampleapproach.partitioning.step.Writer" scope="step" />

	<beans:bean class="org.springframework.batch.core.scope.StepScope" />
</beans:beans>

- Open application.properties file, configure DataSource info:


spring.datasource.url=jdbc:mysql://localhost:3306/testdb
spring.datasource.username=root
spring.datasource.password=12345
spring.batch.job.enabled=false

- In main class, enable batch job proccessing:
@EnableBatchProcessing
@ImportResource("classpath:batchjob.xml")

7. Create JobLaunchController


package com.javasampleapproach.partitioning.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class WebController {
	@Autowired
	JobLauncher jobLauncher;

	@Autowired
	Job job;

	@RequestMapping("/runjob")
	public String handle() throws Exception {
		Logger logger = LoggerFactory.getLogger(this.getClass());
		try {
			JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
					.toJobParameters();
			jobLauncher.run(job, jobParameters);
		} catch (Exception e) {
			logger.info(e.getMessage());
		}
		return "Done! Check Console for more details";
	}
}

8. Create 5 csv files

Create 5 csv files {customer-data-1.csv, customer-data-2.csv, customer-data-3.csv, customer-data-4.csv, customer-data-5.csv}
With Customer's info:


0,Jack,Smith
1,Adam,Johnson
2,Kim,Smith
3,David,Williams
4,Peter,Davis

9. Run & Check results

- Build & Run the project with Spring Boot App mode.
- Create a database's table with SQL:


DROP TABLE IF EXISTS testdb.customer;

CREATE TABLE testdb.customer(
   id INT NOT NULL AUTO_INCREMENT,
   firstname VARCHAR(20) NOT NULL,
   lastname VARCHAR(20) NOT NULL,
   PRIMARY KEY (id)
);

- Then makes a launch request: localhost:8080/runjob
- Result:


o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=partitionJob]] launched with the following parameters: [{time=1488521284866}]
o.s.batch.core.job.SimpleStepHandler     : Executing step: [masterStep]
c.j.p.partitioner.SamplePartitioner      : Starting : Thread0
c.j.p.partitioner.SamplePartitioner      : File Name: customer-data-1.csv
c.j.p.partitioner.SamplePartitioner      : Starting : Thread1
c.j.p.partitioner.SamplePartitioner      : File Name: customer-data-2.csv
c.j.p.partitioner.SamplePartitioner      : Starting : Thread2
c.j.p.partitioner.SamplePartitioner      : File Name: customer-data-3.csv
c.j.p.partitioner.SamplePartitioner      : Starting : Thread3
c.j.p.partitioner.SamplePartitioner      : File Name: customer-data-4.csv
c.j.p.partitioner.SamplePartitioner      : Starting : Thread4
c.j.p.partitioner.SamplePartitioner      : File Name: customer-data-5.csv
c.j.partitioning.step.Processor          : Thread3 processing : Converting (Customer[id=0 , firstName='Jack', lastName='Smith']) into (Customer[id=-192502444 , firstName='JACK', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread1 processing : Converting (Customer[id=0 , firstName='Jack', lastName='Smith']) into (Customer[id=234602813 , firstName='JACK', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread4 processing : Converting (Customer[id=0 , firstName='Jack', lastName='Smith']) into (Customer[id=-1223820058 , firstName='JACK', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread0 processing : Converting (Customer[id=0 , firstName='Jack', lastName='Smith']) into (Customer[id=1928594541 , firstName='JACK', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread4 processing : Converting (Customer[id=1 , firstName='Adam', lastName='Johnson']) into (Customer[id=-1960470815 , firstName='ADAM', lastName='JOHNSON'])
c.j.partitioning.step.Processor          : Thread2 processing : Converting (Customer[id=0 , firstName='Jack', lastName='Smith']) into (Customer[id=-1073860025 , firstName='JACK', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread3 processing : Converting (Customer[id=1 , firstName='Adam', lastName='Johnson']) into (Customer[id=-1021472654 , firstName='ADAM', lastName='JOHNSON'])
c.j.partitioning.step.Processor          : Thread1 processing : Converting (Customer[id=1 , firstName='Adam', lastName='Johnson']) into (Customer[id=1592499918 , firstName='ADAM', lastName='JOHNSON'])
c.j.partitioning.step.Processor          : Thread3 processing : Converting (Customer[id=2 , firstName='Kim', lastName='Smith']) into (Customer[id=964582701 , firstName='KIM', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread4 processing : Converting (Customer[id=2 , firstName='Kim', lastName='Smith']) into (Customer[id=-1909631350 , firstName='KIM', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread0 processing : Converting (Customer[id=1 , firstName='Adam', lastName='Johnson']) into (Customer[id=-2107041132 , firstName='ADAM', lastName='JOHNSON'])
c.j.partitioning.step.Processor          : Thread2 processing : Converting (Customer[id=1 , firstName='Adam', lastName='Johnson']) into (Customer[id=-901650631 , firstName='ADAM', lastName='JOHNSON'])
c.j.partitioning.step.Processor          : Thread1 processing : Converting (Customer[id=2 , firstName='Kim', lastName='Smith']) into (Customer[id=-1766222637 , firstName='KIM', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread4 processing : Converting (Customer[id=3 , firstName='David', lastName='Williams']) into (Customer[id=832027145 , firstName='DAVID', lastName='WILLIAMS'])
c.j.partitioning.step.Processor          : Thread0 processing : Converting (Customer[id=2 , firstName='Kim', lastName='Smith']) into (Customer[id=2139197581 , firstName='KIM', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread2 processing : Converting (Customer[id=2 , firstName='Kim', lastName='Smith']) into (Customer[id=-1235773995 , firstName='KIM', lastName='SMITH'])
c.j.partitioning.step.Processor          : Thread3 processing : Converting (Customer[id=3 , firstName='David', lastName='Williams']) into (Customer[id=-1499785668 , firstName='DAVID', lastName='WILLIAMS'])
c.j.partitioning.step.Processor          : Thread1 processing : Converting (Customer[id=3 , firstName='David', lastName='Williams']) into (Customer[id=-217138188 , firstName='DAVID', lastName='WILLIAMS'])
c.j.partitioning.step.Processor          : Thread3 processing : Converting (Customer[id=4 , firstName='Peter', lastName='Davis']) into (Customer[id=532635592 , firstName='PETER', lastName='DAVIS'])
c.j.partitioning.step.Processor          : Thread2 processing : Converting (Customer[id=3 , firstName='David', lastName='Williams']) into (Customer[id=-508271177 , firstName='DAVID', lastName='WILLIAMS'])
c.j.partitioning.step.Processor          : Thread4 processing : Converting (Customer[id=4 , firstName='Peter', lastName='Davis']) into (Customer[id=-1865852147 , firstName='PETER', lastName='DAVIS'])
c.j.partitioning.step.Processor          : Thread0 processing : Converting (Customer[id=3 , firstName='David', lastName='Williams']) into (Customer[id=847463153 , firstName='DAVID', lastName='WILLIAMS'])
c.j.partitioning.step.Processor          : Thread1 processing : Converting (Customer[id=4 , firstName='Peter', lastName='Davis']) into (Customer[id=-477473155 , firstName='PETER', lastName='DAVIS'])
c.j.partitioning.step.Processor          : Thread2 processing : Converting (Customer[id=4 , firstName='Peter', lastName='Davis']) into (Customer[id=1813947415 , firstName='PETER', lastName='DAVIS'])
c.j.partitioning.step.Processor          : Thread0 processing : Converting (Customer[id=4 , firstName='Peter', lastName='Davis']) into (Customer[id=259984490 , firstName='PETER', lastName='DAVIS'])
o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=partitionJob]] completed with the following parameters: [{time=1488521284866}] and the following status: [COMPLETED]

Spring batch Partition - result

III. SOURCE CODE

SpringBatchPartitioning



By grokonez | March 3, 2017.

Last updated on April 26, 2021.



Related Posts


13 thoughts on “Spring Batch Partition for Scaling & Parallel Processing”

  1. If we have 100 customer-data-*.csv files and if we use grid-size = 100 then there will be 100 threads which is not logical. Hence what is the way to solve this issue i.e. process many files with limited number of threads.

    1. Hi Arpit Garg,

      The number of thread depends on your infrastructure, So you can configure them. The heart of tutorial is how to use PartitionHandler. And understand the Master-Slave steps.
      Your problems can have some ways to resolve:
      – Use 10 threads to process 100 files, each thread processes 10 files.
      – Use 5 threads to process 100 files, each thread handles 20 files.

      1. I have to implement below scenario:
        Use 10 threads to process 100 files, each thread processes 10 files

        Can you suggest any solution for that??

  2. How to verify if “partitioner” execution(all slaves completed execution) completed before going to next step ?

    1. Hi,

      For monitoring the executions of all partitioners, I suggests you can logs time in Partitioners and each steps.
      Then you can make sure the order of the executing flows.

  3. Hi Team,
    I have implemented for similar way with Batch process in the spring boot using annotation it works as rest service for me.
    I am using SimpleAsyncTaskExecutor and reader and processor and writer to read from db and validate in process and inserting into database.
    when one job is launched if i give one more job to run it then its batch is failing.
    i am getting all parameter dynamicllay from webservice.
    simpleJobLauncher.run(itemDataQualtiyReportJob,jobParameters);

    Thanks in advance.

    below is the exception.

    2018-07-12 19:21:23.015 ERROR 24304 — [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Encountered an error executing step step in job itemDataQualtiyReport

    org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:149)
    at org.springframework.batch.item.database.JdbcPagingItemReader.open(JdbcPagingItemReader.java:260)
    at org.springframework.batch.item.database.JdbcPagingItemReader$$FastClassBySpringCGLIB$$42c8e250.invoke()
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:747)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at com.unilog.cimm.reports.batch.reader.Cimm2ItemReader$$EnhancerBySpringCGLIB$$6a6d34be.open()
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
    at org.springframework.batch.core.step.item.ChunkMonitor.open(ChunkMonitor.java:114)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:66)
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:67)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:136)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:308)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:141)
    at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Cannot open an already opened ItemReader, call close first
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.batch.item.database.AbstractPagingItemReader.doOpen(AbstractPagingItemReader.java:133)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:146)
    … 23 common frames omitted

  4. Hi,

    I am using spring batch for reading csv file and inserting into db. Single file is getting inserted automatically.
    But, I need to read and insert two csv file and insert it into database simultaniously.

    Thanks in advance.

    Regards,

    Prabhu

  5. Hi ,
    I have one question regarding number of instance created for Reader,Processor,Writer.
    Is it created for each thread ?.
    Or there are basically same single instance of Reader,Processor,Writer shared by each thread with different input parameters.
    Please respond.
    Thanks

  6. Can u implement a partitioning for reading the records from a single cab file.Scenario is that I need to implement a partitioner for reading. 20 ,. 20 record from single csv.this file has 100 records.
    File always should be the same

Got Something To Say:

Your email address will not be published. Required fields are marked *

*