admin管理员组

文章数量:1336331

Wrote below code to read and write millions of records from 1 DS to another DS - Oracle. Partitioner:

public class DataSyncPartitioner implements Partitioner {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<>();            
        int totalRecords = 1000000;
        int recordsPerPartition = totalRecords / gridSize;

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("partitionId", i);
            context.putInt("start", i * recordsPerPartition);
            context.putInt("end", (i + 1) * recordsPerPartition);

            result.put("partition" + i, context);
        }

        return result;
    }

Reader, Writer and Processor

@Configuration
public class DataSyncBatchConfig {

    @Bean
    @StepScope
    public DataSyncPartitioner partitioner() {
        return new DataSyncPartitioner();
    }

    @Bean
    public Step employeeLoaderMasterStep() throws Exception {

        return new StepBuilder("employeeLoaderMasterStep", getJobRepository())
                .partitioner(EmployeeLoaderSlaveStep().getName(), partitioner())
                .gridSize(100)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step employeeLoaderSlaveStep() throws Exception {

        return new StepBuilder("employeeLoaderSlaveStep", getJobRepository())
                .<Employee, Employee>chunk(100, getTransactionManager1())
                .reader(employeeEntReader())
                .processor(employeeProcessor())
                .writer(accountWriter())
                .build();
    }


    @Bean
    public ItemReader<Employee> employeeEntReader() throws Exception {
        return new JdbcPagingItemReaderBuilder<Employee>()
                .name("Employee Reader")
                .dataSource(vdbDataSource())
                .selectClause("""                                
                        SELECT * FROM Employee
                        """)
                .fromClause("FROM Employee ")
                .sortKeys(Collections.singletonMap("ID", Order.ASCENDING))
                .rowMapper(new EmployeeRowMapper())
                .build();
    }

    @Bean
    public ItemProcessor<Employee, Employee> employeeProcessor() {
        return (transaction) -> {
            Thread.sleep(1);
            return transaction;
        };
    }

    @Bean
    public JdbcBatchItemWriter<Employee> accountWriter() {
        JdbcBatchItemWriter<Employee> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(targetDataSource());
        writer.setSql("""                        
                INSERT INTO EMPLOYEE(ID,NAME,....) VALUES (:id,:versionId,:name...)
                """);
        writer.setItemSqlParameterSourceProvider(BeanPropertySqlParameterSource::new);
        writer.afterPropertiesSet();
        return writer;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setQueueCapacity(5);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

    @Bean(name = "jobRepository")
    public JobRepository getJobRepository() throws Exception {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(h2DataSource());
        factory.setTransactionManager(getTransactionManager1());
        factory.afterPropertiesSet();
        return factory.getObject();
    }

    @Bean(name = "transactionManager1")
    public PlatformTransactionManager getTransactionManager1() {
        return new ResourcelessTransactionManager();
    }

    @Bean(name = "jobLauncher")
    public JobLauncher getJobLauncher() throws Exception {
        TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    @BatchDataSource
    public DataSource h2DataSource() {
        return new EmbeddedDatabaseBuilder()
                .addScript("classpath:/springframework/batch/core/schema-drop-h2.sql")
                .addScript("classpath:/springframework/batch/core/schema-h2.sql")
                .setType(EmbeddedDatabaseType.H2)
                .build();
    }

    @Bean
    @ConfigurationProperties("spring.datasource.target")
    @Primary
    public DataSourceProperties targetDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean
    @Primary
    public DataSource targetDataSource() {
        return targetDataSourceProperties()
                .initializeDataSourceBuilder()
                .build();
    }

    @Bean
    @ConfigurationProperties("spring.datasource.vdb")
    public DataSourceProperties vdbDataSourceProperties() {
        return new DataSourceProperties();
    }

    @Bean
    public DataSource vdbDataSource() {
        return vdbDataSourceProperties()
                .initializeDataSourceBuilder()
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return new JobBuilder("employeeSync", getJobRepository())
                .incrementer(new RunIdIncrementer())
                .start(employeeMasterStep())
                .build();
    }
}

And trying to invoke

@Autowired
@Qualifier("jobLauncher")
private JobLauncher jobLauncher;
 
@Autowired
Job job;

try {
        JobParameters jobParameters = new JobParametersBuilder().addString("jobID", String.valueOf(System.currentTimeMillis()))
                .toJobParameters();

  final JobExecution execution = jobLauncher.run(job, jobParameters);
  } catch (Exception e) {
    e.printStackTrace();
    System.out.println("Job failed");
  }

Getting below error

java.lang.IllegalArgumentException: A Step must be provided.

Not sure whether this code works for copying from DS to another, before checking that, i could not invoke the job, missing something

本文标签: java 17spring batch partitioningIllegalArgumentException A Step must be providedStack Overflow