how to partition steps in spring-batch?

I am learning spring batch and wrote simple application to play with it.
Acccording my requirements I read from single csv file, do some transformation and insert into database.

I have followng configuration:

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }

   @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1, Step step2) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .listener(new JobExecutionListener() {
                    @Override
                    public void beforeJob(JobExecution jobExecution) {
                        System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_BEFORE!!!!!!!!!!!!!!!!");
                    }

                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        System.out.println("!!!!!!!!!!!!!SECOND_LISTENER_AFTER!!!!!!!!!!!!!!!!");

                    }
                })
                .flow(step1)
                .next(step2)
                .end()
                .build();
    }

public FlatFileItemReader reader() {
    return new FlatFileItemReaderBuilder()
        .name("csvPersonReader")
        .resource(csvResource)
        .delimited()
        .names(new String[]{"firstName", "lastName"})
        .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
            setTargetType(Person.class);
        }})
        .build();

}

Now I want to make that step executing using 10 threads. As far I understood I need to use partitioning feaure for that. I’ve found several examples about it but it contains XML configuration. I prefer to use java configuration.

How can I achieve it ?

P.S.

I tried the following approach:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(1);
    TaskletStep step1 = stepBuilderFactory.get("step1")
            .<Person, Person>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .taskExecutor(taskExecutor)
            .build();

    return step1;
}

But my application hangs. Moreover it doesn’t a partition and will work only on a single PC

2
Leave a Reply

avatar
2 Comment threads
0 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
1 Comment authors
Jason Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Jason
Guest

You can use below code to implement batch Partition. @Configuration public class DemoJobBatchConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(DemoJobBatchConfiguration.class); @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("applicaionDS") public DataSource dataSource; @Autowired UserWritter userWriter; @Bean("demoJob") public Job partitionJob(JobNotificationListener listener, JobBuilderFactory jobBuilderFactory, @Qualifier("demoPartitionStep") Step demoPartitionStep) { return jobBuilderFactory.get("demoJob").incrementer(new RunIdIncrementer()).listener(listener) .start(demoPartitionStep).build(); } @Bean(name = "demoPartitionStep") public Step demoPartitionStep(Step demoSlaveStep, StepBuilderFactory stepBuilderFactory) { return stepBuilderFactory.get("demoPartitionStep").partitioner("demoPartitionStep", demoPartitioner()) .gridSize(21).step(demoSlaveStep).taskExecutor(jobTaskExecutor()).build(); } @Bean(name = "demoPartitioner", destroyMethod = "") public Partitioner demoPartitioner() { DemoPartitioner partitioner = new DemoPartitioner(); // partitioner.partition(20); return partitioner; } @Bean public Step demoSlaveStep(ItemReader<User> demoReader, ItemProcessor<User, User> demoJobProcessor) { return stepBuilderFactory.get("demoSlaveStep").<User, User>chunk(3).reader(demoReader)… Read more »

Jason
Guest

Your configuration is wrong. Follow below configuration. You need to decide the logic on which you want to partition. Look at the partition method of partitioner how it is creating a map and adding to Execution Context. Follow below code @Bean public Step step1(JdbcBatchItemWriter<Person> writer) { TaskletStep step1 = stepBuilderFactory.get("partionerStep") .partitioner("slaveStep", partitioner()) .step(slaveStep()) .taskExecutor(taskExecutor()) .build(); } @Bean public CustomPartitioner partitioner() { CustomPartitioner partitioner = new CustomPartitioner(); return partitioner; } public class CustomPartitioner implements Partitioner { @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> map = new HashMap<>(gridSize); int i = 0, k = 1; for (Resource resource : resources)… Read more »