admin管理员组文章数量:1336331
Wrote below code to read and write millions of records from 1 DS to another DS - Oracle. Partitioner:
public class DataSyncPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
int totalRecords = 1000000;
int recordsPerPartition = totalRecords / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("partitionId", i);
context.putInt("start", i * recordsPerPartition);
context.putInt("end", (i + 1) * recordsPerPartition);
result.put("partition" + i, context);
}
return result;
}
Reader, Writer and Processor
@Configuration
public class DataSyncBatchConfig {
@Bean
@StepScope
public DataSyncPartitioner partitioner() {
return new DataSyncPartitioner();
}
@Bean
public Step employeeLoaderMasterStep() throws Exception {
return new StepBuilder("employeeLoaderMasterStep", getJobRepository())
.partitioner(EmployeeLoaderSlaveStep().getName(), partitioner())
.gridSize(100)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step employeeLoaderSlaveStep() throws Exception {
return new StepBuilder("employeeLoaderSlaveStep", getJobRepository())
.<Employee, Employee>chunk(100, getTransactionManager1())
.reader(employeeEntReader())
.processor(employeeProcessor())
.writer(accountWriter())
.build();
}
@Bean
public ItemReader<Employee> employeeEntReader() throws Exception {
return new JdbcPagingItemReaderBuilder<Employee>()
.name("Employee Reader")
.dataSource(vdbDataSource())
.selectClause("""
SELECT * FROM Employee
""")
.fromClause("FROM Employee ")
.sortKeys(Collections.singletonMap("ID", Order.ASCENDING))
.rowMapper(new EmployeeRowMapper())
.build();
}
@Bean
public ItemProcessor<Employee, Employee> employeeProcessor() {
return (transaction) -> {
Thread.sleep(1);
return transaction;
};
}
@Bean
public JdbcBatchItemWriter<Employee> accountWriter() {
JdbcBatchItemWriter<Employee> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(targetDataSource());
writer.setSql("""
INSERT INTO EMPLOYEE(ID,NAME,....) VALUES (:id,:versionId,:name...)
""");
writer.setItemSqlParameterSourceProvider(BeanPropertySqlParameterSource::new);
writer.afterPropertiesSet();
return writer;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(5);
taskExecutor.setCorePoolSize(5);
taskExecutor.setQueueCapacity(5);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean(name = "jobRepository")
public JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(h2DataSource());
factory.setTransactionManager(getTransactionManager1());
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean(name = "transactionManager1")
public PlatformTransactionManager getTransactionManager1() {
return new ResourcelessTransactionManager();
}
@Bean(name = "jobLauncher")
public JobLauncher getJobLauncher() throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
@BatchDataSource
public DataSource h2DataSource() {
return new EmbeddedDatabaseBuilder()
.addScript("classpath:/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:/springframework/batch/core/schema-h2.sql")
.setType(EmbeddedDatabaseType.H2)
.build();
}
@Bean
@ConfigurationProperties("spring.datasource.target")
@Primary
public DataSourceProperties targetDataSourceProperties() {
return new DataSourceProperties();
}
@Bean
@Primary
public DataSource targetDataSource() {
return targetDataSourceProperties()
.initializeDataSourceBuilder()
.build();
}
@Bean
@ConfigurationProperties("spring.datasource.vdb")
public DataSourceProperties vdbDataSourceProperties() {
return new DataSourceProperties();
}
@Bean
public DataSource vdbDataSource() {
return vdbDataSourceProperties()
.initializeDataSourceBuilder()
.build();
}
@Bean
public Job job() throws Exception {
return new JobBuilder("employeeSync", getJobRepository())
.incrementer(new RunIdIncrementer())
.start(employeeMasterStep())
.build();
}
}
And trying to invoke
@Autowired
@Qualifier("jobLauncher")
private JobLauncher jobLauncher;
@Autowired
Job job;
try {
JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
.toJobParameters();
final JobExecution execution = jobLauncher.run(job, jobParameters);
} catch (Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
Getting below error
java.lang.IllegalArgumentException: A Step must be provided.
Not sure whether this code works for copying from DS to another, before checking that, i could not invoke the job, missing something
本文标签: java 17spring batch partitioningIllegalArgumentException A Step must be providedStack Overflow
版权声明:本文标题:java 17 - spring batch partitioning - IllegalArgumentException: A Step must be provided - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742405883a2468812.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论