SpringBatch - Partitioning

2 분 소요

개요

  • SpringBatch에서 제공하는 Scaling 전략(strategy)는 아래 4가지로 정리할 수 있음.
Strategy Description Local/Remote
Multithreaded step A step is multithreaded Local
Parallel step Executes steps in parallel using multithreading Local
Remote chunking Distributes chunk processing to remote nodes Remote
Partitioning step Partitions data and splits up processing Local and Remote

출처: Spring Batch in Action (2011)

  • 이 전략 중에서 Partitioning 구현 예제를 작성함

config

  • config 객체를 작성함
  • config 객체는 partitioning batch 구동에 필요한 기본적인 빈 객체를 생성함
  • 기본적인 빈 객체는 Job(partitionJob), Master step(partitionStep), Slave step(workerStep) Partitioner, PartitionHandler, TaskExecutor가 있음
  • 각 객체에 대한 설명은 구현 소스에 주석 형태로 입력함

  • 구현 클래스: PartitionBatchConfig

    PartitionBatchConfig 클래스 접기/펼치기 버튼
    @Configuration
    @RequiredArgsConstructor
    @EnableBatchProcessing
    public class PartitionBatchConfig {
        public final JobBuilderFactory jobBuilderFactory;
        public final StepBuilderFactory stepBuilderFactory;
        public final ItemReader<User> itemReader;
        public final ItemProcessor<User, User> itemProcessor;
        public final ItemWriter<User> itemWriter;
      
        private int chunkSize;
      
        @Value("${chunkSize:100}")
        public void setChunkSize(int chunkSize) {
            this.chunkSize = chunkSize;
        }
      
        private int poolSize;
      
        @Value("${poolSize:10}")
        public void setPoolSize(int poolSize) {
            this.poolSize = poolSize;
        }
      
        private int minValue;
      
        @Value("${minValue:1}")
        public void setMinValue(int minValue) {
            this.minValue = minValue;
        }
      
        private int maxValue;
      
        @Value("${maxValue:100}")
        public void setMaxValue(int maxValue) {
            this.maxValue = maxValue;
        }
      
        @Bean(name = "partitionJob")
        public Job partitionJob(){
            return jobBuilderFactory.get("partitionJob")
                    .incrementer(new RunIdIncrementer())
                    .start(partitionStep())
                    .build();
        }
      
        @Bean(name = "partitionStep")
        public Step partitionStep() {
            return stepBuilderFactory.get("partitionStep")
                    .partitioner("partitionStep", partitioner())
                    .partitionHandler(partitionHandler())
                    .build();
        }
      
      /**
       * 파티셔닝된 Slave Step에 대한 ExecutionContext를 생성함
       * ExecutionContext 개수는 PartitionHandler에 의해 전달되는 gridSize로 결정
       * ColumnRangePartitioner는 처리할 데이터 최대,최소 값과 girdSize를 통해 각 executionContext
       * 가 가질 변수값(start, end)을 설정함
       * @return
       */
      @Bean
        public Partitioner partitioner() {
            return new ColumnRangePartitioner(minValue, maxValue);
        }
      
        @Bean(name = "workerStep")
        public Step workerStep() {
            return stepBuilderFactory.get("workerStep")
                    .<User,User> chunk(5)
                    .reader(itemReader)
                    .processor(itemProcessor)
                    .writer(itemWriter)
                    .build();
      
        }
      
      /**
       * Master Step에서 Slave Step을 어떻게 처리할지 정의함
       * 정의 요소는 slave step 지정, gridSize(slave step 개수와 동일), taskExecutor(멀티쓰레드 실행)
       * @return
       */
      @Bean(name="partitionHandler")
        public TaskExecutorPartitionHandler partitionHandler() {
            TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
            handler.setGridSize(poolSize);
            handler.setTaskExecutor(taskExecutor());
            handler.setStep(workerStep());
      
            return handler;
        }
      
      /**
       * 지정된 갯수 내에 쓰레드만 생성할 수 있도록 ThreadPoolTaskExecutor 사용
       * @return
       */
      @Bean(name = "taskExecutor")
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(poolSize);
            executor.setMaxPoolSize(poolSize);
            executor.setThreadNamePrefix("partition-thread");
            executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
            executor.initialize();
            return executor;
        }
    }
    

batch

  • ColumnRangePatitioner

    ColumnRangePatitioner 클래스 접기/펼치기 버튼
    package com.techprimers.springbatchexample1.batch.partition;
      
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.partition.support.Partitioner;
    import org.springframework.batch.item.ExecutionContext;
      
    import java.util.HashMap;
    import java.util.Map;
      
    @Slf4j
    public class ColumnRangePartitioner implements Partitioner {
        public int min;
        public int max;
      
        public ColumnRangePartitioner(int min, int max) {
            this.min = min;
            this.max = max;
        }
      
        @Override
        public Map<String, ExecutionContext> partition(int gridSize) {
            int targetSize = (max - min) / gridSize + 1;
      
            Map<String, ExecutionContext> result = new HashMap<>();
            int number = 0;
            int start = min;
            int end = start + targetSize - 1;
      
            while (start <= max) {
                log.info("start: "+ start + ", end: " + end);
                ExecutionContext value = new ExecutionContext();
                result.put("partition" + number, value);
      
                if (end >= max) {
                    end = max;
                }
                value.putInt("minValue", start);
                value.putInt("maxValue", end);
                start += targetSize;
                end += targetSize;
                number++;
            }
      
            return result;
        }
    }
    

controller

  • PartitionController

    PartitionController 클래스 접기/펼치기 버튼
    package com.techprimers.springbatchexample1.controller;
      
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
      
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.Map;
      
    @Slf4j
    @RestController
    public class PartitionController {
      
        @Autowired
        JobLauncher jobLauncher;
      
        @Resource(name = "partitionJob")
        Job job;
      
        @GetMapping("partition/load")
        public BatchStatus load() throws JobParametersInvalidException, JobExecutionException {
            Map<String, JobParameter> maps = new HashMap<>();
            maps.put("time", new JobParameter(System.currentTimeMillis()));
      
            JobParameters parameters = new JobParameters(maps);
            JobExecution jobExecution = jobLauncher.run(job, parameters);
      
            log.info("JobExecution: " + jobExecution.getStatus());
      
            log.info("Batch is Running...");
      
            while (jobExecution.isRunning()) {
                log.info("...");
            }
      
            return jobExecution.getStatus();
        }
      
    }
    

model

  • UserModel

    UserModel 클래스 접기/펼치기 버튼
    package com.techprimers.springbatchexample1.model;
      
    import javax.persistence.Entity;
    import javax.persistence.Id;
    import java.util.Date;
      
    @Entity
    public class User {
        @Id
        private Integer id;
        private String name;
        private String dept;
        private Integer salary;
        private Date time;
      
        public User(Integer id, String name, String dept, Integer salary, Date time) {
            this.id = id;
            this.name = name;
            this.dept = dept;
            this.salary = salary;
            this.time = time;
        }
      
        public User() {
      
        }
      
      
        public Integer getId() {
            return id;
        }
      
        public void setId(Integer id) {
            this.id = id;
        }
      
        public String getName() {
            return name;
        }
      
        public void setName(String name) {
            this.name = name;
        }
      
        public String getDept() {
            return dept;
        }
      
        public void setDept(String dept) {
            this.dept = dept;
        }
      
        public Integer getSalary() {
            return salary;
        }
      
        public void setSalary(Integer salary) {
            this.salary = salary;
        }
      
        @Override
        public String toString() {
            return "User{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", dept='" + dept + '\'' +
                    ", salary=" + salary +
                    '}';
        }
      
        public Date getTime() {
            return time;
        }
      
        public void setTime(Date time) {
            this.time = time;
        }
    }
    

repository

  • UserRepository

    UserRepository 클래스 접기/펼치기 버튼
    package com.techprimers.springbatchexample1.repository;
      
    import com.techprimers.springbatchexample1.model.User;
    import org.springframework.data.jpa.repository.JpaRepository;
      
    public interface UserRepository extends JpaRepository<User, Integer> {
    }
    

카테고리:

업데이트:

댓글남기기