SpringBatch - Partitioning
개요
- SpringBatch에서 제공하는 Scaling 전략(strategy)는 아래 4가지로 정리할 수 있음.
Strategy | Description | Local/Remote |
---|---|---|
Multithreaded step | A step is multithreaded | Local |
Parallel step | Executes steps in parallel using multithreading | Local |
Remote chunking | Distributes chunk processing to remote nodes | Remote |
Partitioning step | Partitions data and splits up processing | Local and Remote |
출처: Spring Batch in Action (2011)
- 이 전략 중에서 Partitioning 구현 예제를 작성함
config
- config 객체를 작성함
- config 객체는 partitioning batch 구동에 필요한 기본적인 빈 객체를 생성함
- 기본적인 빈 객체는 Job(partitionJob), Master step(partitionStep), Slave step(workerStep) Partitioner, PartitionHandler, TaskExecutor가 있음
-
각 객체에 대한 설명은 구현 소스에 주석 형태로 입력함
-
구현 클래스: PartitionBatchConfig
PartitionBatchConfig 클래스 접기/펼치기 버튼
@Configuration @RequiredArgsConstructor @EnableBatchProcessing public class PartitionBatchConfig { public final JobBuilderFactory jobBuilderFactory; public final StepBuilderFactory stepBuilderFactory; public final ItemReader<User> itemReader; public final ItemProcessor<User, User> itemProcessor; public final ItemWriter<User> itemWriter; private int chunkSize; @Value("${chunkSize:100}") public void setChunkSize(int chunkSize) { this.chunkSize = chunkSize; } private int poolSize; @Value("${poolSize:10}") public void setPoolSize(int poolSize) { this.poolSize = poolSize; } private int minValue; @Value("${minValue:1}") public void setMinValue(int minValue) { this.minValue = minValue; } private int maxValue; @Value("${maxValue:100}") public void setMaxValue(int maxValue) { this.maxValue = maxValue; } @Bean(name = "partitionJob") public Job partitionJob(){ return jobBuilderFactory.get("partitionJob") .incrementer(new RunIdIncrementer()) .start(partitionStep()) .build(); } @Bean(name = "partitionStep") public Step partitionStep() { return stepBuilderFactory.get("partitionStep") .partitioner("partitionStep", partitioner()) .partitionHandler(partitionHandler()) .build(); } /** * 파티셔닝된 Slave Step에 대한 ExecutionContext를 생성함 * ExecutionContext 개수는 PartitionHandler에 의해 전달되는 gridSize로 결정 * ColumnRangePartitioner는 처리할 데이터 최대,최소 값과 girdSize를 통해 각 executionContext * 가 가질 변수값(start, end)을 설정함 * @return */ @Bean public Partitioner partitioner() { return new ColumnRangePartitioner(minValue, maxValue); } @Bean(name = "workerStep") public Step workerStep() { return stepBuilderFactory.get("workerStep") .<User,User> chunk(5) .reader(itemReader) .processor(itemProcessor) .writer(itemWriter) .build(); } /** * Master Step에서 Slave Step을 어떻게 처리할지 정의함 * 정의 요소는 slave step 지정, gridSize(slave step 개수와 동일), taskExecutor(멀티쓰레드 실행) * @return */ @Bean(name="partitionHandler") public TaskExecutorPartitionHandler partitionHandler() { TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler(); handler.setGridSize(poolSize); handler.setTaskExecutor(taskExecutor()); handler.setStep(workerStep()); return handler; } /** * 지정된 갯수 내에 쓰레드만 생성할 수 있도록 ThreadPoolTaskExecutor 사용 * @return */ @Bean(name = "taskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(poolSize); executor.setThreadNamePrefix("partition-thread"); executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE); executor.initialize(); return executor; } }
batch
-
ColumnRangePatitioner
ColumnRangePatitioner 클래스 접기/펼치기 버튼
package com.techprimers.springbatchexample1.batch.partition; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ExecutionContext; import java.util.HashMap; import java.util.Map; @Slf4j public class ColumnRangePartitioner implements Partitioner { public int min; public int max; public ColumnRangePartitioner(int min, int max) { this.min = min; this.max = max; } @Override public Map<String, ExecutionContext> partition(int gridSize) { int targetSize = (max - min) / gridSize + 1; Map<String, ExecutionContext> result = new HashMap<>(); int number = 0; int start = min; int end = start + targetSize - 1; while (start <= max) { log.info("start: "+ start + ", end: " + end); ExecutionContext value = new ExecutionContext(); result.put("partition" + number, value); if (end >= max) { end = max; } value.putInt("minValue", start); value.putInt("maxValue", end); start += targetSize; end += targetSize; number++; } return result; } }
controller
-
PartitionController
PartitionController 클래스 접기/펼치기 버튼
package com.techprimers.springbatchexample1.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.*; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; @Slf4j @RestController public class PartitionController { @Autowired JobLauncher jobLauncher; @Resource(name = "partitionJob") Job job; @GetMapping("partition/load") public BatchStatus load() throws JobParametersInvalidException, JobExecutionException { Map<String, JobParameter> maps = new HashMap<>(); maps.put("time", new JobParameter(System.currentTimeMillis())); JobParameters parameters = new JobParameters(maps); JobExecution jobExecution = jobLauncher.run(job, parameters); log.info("JobExecution: " + jobExecution.getStatus()); log.info("Batch is Running..."); while (jobExecution.isRunning()) { log.info("..."); } return jobExecution.getStatus(); } }
model
-
UserModel
UserModel 클래스 접기/펼치기 버튼
package com.techprimers.springbatchexample1.model; import javax.persistence.Entity; import javax.persistence.Id; import java.util.Date; @Entity public class User { @Id private Integer id; private String name; private String dept; private Integer salary; private Date time; public User(Integer id, String name, String dept, Integer salary, Date time) { this.id = id; this.name = name; this.dept = dept; this.salary = salary; this.time = time; } public User() { } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDept() { return dept; } public void setDept(String dept) { this.dept = dept; } public Integer getSalary() { return salary; } public void setSalary(Integer salary) { this.salary = salary; } @Override public String toString() { return "User{" + "id=" + id + ", name='" + name + '\'' + ", dept='" + dept + '\'' + ", salary=" + salary + '}'; } public Date getTime() { return time; } public void setTime(Date time) { this.time = time; } }
repository
-
UserRepository
UserRepository 클래스 접기/펼치기 버튼
package com.techprimers.springbatchexample1.repository; import com.techprimers.springbatchexample1.model.User; import org.springframework.data.jpa.repository.JpaRepository; public interface UserRepository extends JpaRepository<User, Integer> { }
댓글남기기