Spring Batch is a powerful module to implement a batch process for tons of data conveniently.
This tutorial guide you how to import CSV Data to PostgreSQL Database using Spring Batch Job.
Related Articles:
– How to start with Spring Batch using Spring Boot
– How to use Spring JDBC Template for Postgres Database
I. Technology
– Java 1.8
– Maven 3.3.9
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.4.0.RELEASE
II. Overview
1. Goal
To build a simple application that expresses how we can use Spring Batch with Job Step (including ItemReader, ItemProcessor, ItemWriter and JobExecutionListener) to read Customer Data from CSV file, then put them to PostgreSQL Table named ‘customer’.
– Reader: reads content of CSV file, then maps the read data to fields of DataModel Customer.
– Processor: converts each Customer record’s content to new content (for example, get Random ID and uppercase Name String) which will be written to Database Table.
– Writer: writes batch of records to PostgreSQL Database using DAO.
– Listener: handles after Step, read data from Database Table to verify and show LOGs.
– RestController: runs Job using JobLauncher, then return Complete String to Client.
– DAO: interacts with Database.
2. Project Structure
3. Step by step
– Create Spring Boot project
– Configure application properties
– Create a DataModel
– Create a DAO
– Create Job Step: Reader, Processor, Writer, Listener
– Create Batch Configuration
– Create a WebController
– Run Spring Boot Application & Enjoy Result
III. Practice
1. Create Spring Boot project
– Open Spring Tool Suite, on Menu, choose File -> New -> Spring Starter Project, then fill each fields:
Click Next, in:
– I/O: choose Batch
– SQL: choose JDBC and PostgreSQL
– Web: choose Web
Click Finish. Spring Boot project will be created successfully.
2.Configure application properties, add SQL Script File & CSV file
– Add configuration for Datasource and disable Spring Batch Job auto-run in application.properties (locates in src/main/resources):
spring.datasource.url=jdbc:postgresql://localhost:5432/testcsvdb spring.datasource.username=postgres spring.datasource.password=123 spring.datasource.platform=postgresql spring.batch.job.enabled=false |
– under src/main/resources:
+ schema-postgresql.sql
DROP TABLE IF EXISTS customer; CREATE TABLE customer ( id Bigserial PRIMARY KEY NOT NULL, first_name VARCHAR(20), last_name VARCHAR(20) ); |
+ customer-data.csv
0,Jack,Smith 1,Adam,Johnson 2,Kim,Smith 3,David,Williams 4,Peter,Davis |
3. Create a DataModel
Under package model, create class Customer.
Content of Customer.java:
package com.javasampleapproach.batchcsvpostgresql.model; public class Customer { private long id; private String firstName; private String lastName; public Customer() { } public Customer(long id, String firstName, String lastName) { this.id = id; this.firstName = firstName; this.lastName = lastName; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public String toString() { return String.format("Customer[id=%d , firstName='%s', lastName='%s']", id, firstName, lastName); } } |
4. Create a DAO
– Under package dao, create interface CustomerDao:
package com.javasampleapproach.batchcsvpostgresql.dao; import java.util.List; import com.javasampleapproach.batchcsvpostgresql.model.Customer; public interface CustomerDao { public void insert(List<? extends Customer> customers); List<Customer> loadAllCustomers(); } |
– Under package dao.impl, create implementation of CustomerDao – CustomerDaoImpl:
package com.javasampleapproach.batchcsvpostgresql.dao.impl; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import javax.sql.DataSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.BatchPreparedStatementSetter; import org.springframework.jdbc.core.support.JdbcDaoSupport; import org.springframework.stereotype.Repository; import com.javasampleapproach.batchcsvpostgresql.dao.CustomerDao; import com.javasampleapproach.batchcsvpostgresql.model.Customer; @Repository public class CustomerDaoImpl extends JdbcDaoSupport implements CustomerDao { @Autowired DataSource dataSource; @PostConstruct private void initialize() { setDataSource(dataSource); } @Override public void insert(List<? extends Customer> Customers) { String sql = "INSERT INTO customer " + "(id, first_name, last_name) VALUES (?, ?, ?)"; getJdbcTemplate().batchUpdate(sql, new BatchPreparedStatementSetter() { public void setValues(PreparedStatement ps, int i) throws SQLException { Customer customer = Customers.get(i); ps.setLong(1, customer.getId()); ps.setString(2, customer.getFirstName()); ps.setString(3, customer.getLastName()); } public int getBatchSize() { return Customers.size(); } }); } @Override public List<Customer> loadAllCustomers() { String sql = "SELECT * FROM customer"; List<Map<String, Object>> rows = getJdbcTemplate().queryForList(sql); List<Customer> result = new ArrayList<Customer>(); for (Map<String, Object> row : rows) { Customer customer = new Customer(); customer.setId((Long) row.get("id")); customer.setFirstName((String) row.get("first_name")); customer.setLastName((String) row.get("last_name")); result.add(customer); } return result; } } |
5. Create Job Step: Reader, Processor, Writer, Listener
Under package step:
Reader.java:
package com.javasampleapproach.batchcsvpostgresql.step; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.core.io.ClassPathResource; import com.javasampleapproach.batchcsvpostgresql.model.Customer; public class Reader { public static FlatFileItemReader<Customer> reader(String path) { FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>(); reader.setResource(new ClassPathResource(path)); reader.setLineMapper(new DefaultLineMapper<Customer>() { { setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Customer>() { { setTargetType(Customer.class); } }); } }); return reader; } } |
Processor.java:
package com.javasampleapproach.batchcsvpostgresql.step; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; import com.javasampleapproach.batchcsvpostgresql.model.Customer; public class Processor implements ItemProcessor<Customer, Customer> { private static final Logger log = LoggerFactory.getLogger(Processor.class); @Override public Customer process(Customer customer) throws Exception { Random r = new Random(); final String firstName = customer.getFirstName().toUpperCase(); final String lastName = customer.getLastName().toUpperCase(); final Customer fixedCustomer = new Customer(r.nextLong(), firstName, lastName); log.info("Converting (" + customer + ") into (" + fixedCustomer + ")"); return fixedCustomer; } } |
Writer.java:
package com.javasampleapproach.batchcsvpostgresql.step; import java.util.List; import org.springframework.batch.item.ItemWriter; import com.javasampleapproach.batchcsvpostgresql.dao.CustomerDao; import com.javasampleapproach.batchcsvpostgresql.model.Customer; public class Writer implements ItemWriter<Customer> { private final CustomerDao customerDao; public Writer(CustomerDao customerDao) { this.customerDao = customerDao; } @Override public void write(List<? extends Customer> customers) throws Exception { customerDao.insert(customers); } } |
Listener.java:
package com.javasampleapproach.batchcsvpostgresql.step; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import com.javasampleapproach.batchcsvpostgresql.dao.CustomerDao; import com.javasampleapproach.batchcsvpostgresql.model.Customer; public class Listener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(Listener.class); private final CustomerDao customerDao; public Listener(CustomerDao customerDao) { this.customerDao = customerDao; } @Override public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("Finish Job! Check the results"); List<Customer> customers = customerDao.loadAllCustomers(); for (Customer customer : customers) { log.info("Found <" + customer + "> in the database."); } } } } |
JobExecutionListenerSupport is an implementation of JobExecutionListener.
We can make our own operations before start of a Step (override beforeJob method) and after its ending (normally or failed by overriding afterJob method).
The annotations corresponding to this interface are @BeforeStep and @AfterStep.
6. Create Batch Configuration
Under package config, create BatchConfig.java:
package com.javasampleapproach.batchcsvpostgresql.config; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.javasampleapproach.batchcsvpostgresql.dao.CustomerDao; import com.javasampleapproach.batchcsvpostgresql.model.Customer; import com.javasampleapproach.batchcsvpostgresql.step.Listener; import com.javasampleapproach.batchcsvpostgresql.step.Processor; import com.javasampleapproach.batchcsvpostgresql.step.Reader; import com.javasampleapproach.batchcsvpostgresql.step.Writer; @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired public CustomerDao customerDao; @Bean public Job job() { return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer()).listener(new Listener(customerDao)) .flow(step1()).end().build(); } @Bean public Step step1() { return stepBuilderFactory.get("step1").<Customer, Customer>chunk(2) .reader(Reader.reader("customer-data.csv")) .processor(new Processor()).writer(new Writer(customerDao)).build(); } } |
The input parameter for method chunk of StepBuilder specifies the number of items to read before writing out via the ItemWriter.
7. Create a WebController
Under package controller, create WebController.java:
package com.javasampleapproach.batchcsvpostgresql.controller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class WebController { @Autowired JobLauncher jobLauncher; @Autowired Job job; @RequestMapping("/runjob") public String handle() throws Exception { Logger logger = LoggerFactory.getLogger(this.getClass()); try { JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(job, jobParameters); } catch (Exception e) { logger.info(e.getMessage()); } return "Done! Check Console Window for more details"; } } |
8. Run Spring Boot Application & Enjoy Result
– Config maven build:
clean install
– Run project with mode Spring Boot App
– Check results by access:
http://localhost:8080/runjob
System shows:
Job: [FlowJob: [name=job]] launched with the following parameters: [{time=1474771650234}] Executing step: [step1] Converting (Customer[id=0 , firstName='Jack', lastName='Smith']) into (Customer[id=6323287188840625065 , firstName='JACK', lastName='SMITH']) Converting (Customer[id=1 , firstName='Adam', lastName='Johnson']) into (Customer[id=-5061932588533513687 , firstName='ADAM', lastName='JOHNSON']) Converting (Customer[id=2 , firstName='Kim', lastName='Smith']) into (Customer[id=250312719692371085 , firstName='KIM', lastName='SMITH']) Converting (Customer[id=3 , firstName='David', lastName='Williams']) into (Customer[id=2361081371024882848 , firstName='DAVID', lastName='WILLIAMS']) Converting (Customer[id=4 , firstName='Peter', lastName='Davis']) into (Customer[id=-8239802131547868532 , firstName='PETER', lastName='DAVIS']) Finish Job! Check the results Found <Customer[id=6323287188840625065 , firstName='JACK', lastName='SMITH']> in the database. Found <Customer[id=-5061932588533513687 , firstName='ADAM', lastName='JOHNSON']> in the database. Found <Customer[id=250312719692371085 , firstName='KIM', lastName='SMITH']> in the database. Found <Customer[id=2361081371024882848 , firstName='DAVID', lastName='WILLIAMS']> in the database. Found <Customer[id=-8239802131547868532 , firstName='PETER', lastName='DAVIS']> in the database. Job: [FlowJob: [name=job]] completed with the following parameters: [{time=1474771650234}] and the following status: [COMPLETED] |
Open phAdminIII to check table customer in database testcsvdb:
IV. Source Code
Last updated on June 18, 2017.
this problem is happening:
PreparedStatementCallback; bad SQL grammar [INSERT into BATCH_JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED, JOB_CONFIGURATION_LOCATION) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)]; nested exception is org.postgresql.util.PSQLException: ERROR: column “job_configuration_location” of relation “batch_job_execution” does not exist
Posição: 159
Hi Joe,
Please check again your environment for development.
I think it works fine after carefully testing.
You can check more with the article (includes guideline video):
How to start with Spring Batch using Spring Boot – Java Config
Thanks & Regards,
How to write unit tests and integrtaion tests for this projects, i have created created similair porject ..stuck with writing test case..please help
Could you please post on – how to import postgresql database to csv in same manner of using DAO concepts
How can i sent response after JobExecution completed as Job running asynchronously?
How can I dynamically specify the path of the csv file