From 087d771c5369ca17d328021f7ea92fc466b8fc21 Mon Sep 17 00:00:00 2001 From: Zhengjiaao Date: Mon, 28 Nov 2022 14:14:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(root)=20=E6=96=B0=E5=A2=9E=20starter-batch?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- starter-batch/README.md | 13 ++ starter-batch/pom.xml | 57 ++++++ .../main/java/com/zja/BatchApplication.java | 28 +++ .../java/com/zja/config/OneBatchConfig.java | 166 ++++++++++++++++++ .../java/com/zja/config/Swagger3Config.java | 40 +++++ .../java/com/zja/config/TwoBatchConfig.java | 117 ++++++++++++ .../zja/controller/PersonBatchController.java | 46 +++++ .../zja/controller/UserBatchController.java | 40 +++++ .../java/com/zja/csv/CsvBeanValidator.java | 24 +++ .../main/java/com/zja/csv/CsvJobListener.java | 31 ++++ .../main/java/com/zja/csv/CsvLineMapper.java | 21 +++ .../zja/csv/CsvValidatingItemProcessor.java | 30 ++++ .../src/main/java/com/zja/dto/OneDTO.java | 20 +++ .../src/main/java/com/zja/dto/TwoDTO.java | 20 +++ .../main/java/com/zja/entity/OneEntity.java | 30 ++++ .../main/java/com/zja/entity/TwoEntity.java | 30 ++++ .../listeners/OneJobExecutionListener.java | 33 ++++ .../com/zja/listeners/OneReadListener.java | 35 ++++ .../com/zja/listeners/OneWriteListener.java | 45 +++++ .../src/main/resources/application.yaml | 22 +++ starter-batch/src/main/resources/csv/one.csv | 6 + starter-batch/src/main/resources/csv/user.csv | 5 + .../java/com/zja/BatchApplicationTests.java | 47 +++++ 24 files changed, 907 insertions(+), 1 deletion(-) create mode 100644 starter-batch/README.md create mode 100644 starter-batch/pom.xml create mode 100644 starter-batch/src/main/java/com/zja/BatchApplication.java create mode 100644 starter-batch/src/main/java/com/zja/config/OneBatchConfig.java create mode 100644 starter-batch/src/main/java/com/zja/config/Swagger3Config.java create mode 100644 starter-batch/src/main/java/com/zja/config/TwoBatchConfig.java create mode 100644 starter-batch/src/main/java/com/zja/controller/PersonBatchController.java create mode 100644 starter-batch/src/main/java/com/zja/controller/UserBatchController.java create mode 100644 starter-batch/src/main/java/com/zja/csv/CsvBeanValidator.java create mode 100644 starter-batch/src/main/java/com/zja/csv/CsvJobListener.java create mode 100644 starter-batch/src/main/java/com/zja/csv/CsvLineMapper.java create mode 100644 starter-batch/src/main/java/com/zja/csv/CsvValidatingItemProcessor.java create mode 100644 starter-batch/src/main/java/com/zja/dto/OneDTO.java create mode 100644 starter-batch/src/main/java/com/zja/dto/TwoDTO.java create mode 100644 starter-batch/src/main/java/com/zja/entity/OneEntity.java create mode 100644 starter-batch/src/main/java/com/zja/entity/TwoEntity.java create mode 100644 starter-batch/src/main/java/com/zja/listeners/OneJobExecutionListener.java create mode 100644 starter-batch/src/main/java/com/zja/listeners/OneReadListener.java create mode 100644 starter-batch/src/main/java/com/zja/listeners/OneWriteListener.java create mode 100644 starter-batch/src/main/resources/application.yaml create mode 100644 starter-batch/src/main/resources/csv/one.csv create mode 100644 starter-batch/src/main/resources/csv/user.csv create mode 100644 starter-batch/src/test/java/com/zja/BatchApplicationTests.java diff --git a/pom.xml b/pom.xml index 5a20ed2..3f2c949 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ starter-actuator starter-aop - starter-batch + starter-cache starter-data starter-dubbo diff --git a/starter-batch/README.md b/starter-batch/README.md new file mode 100644 index 0000000..e49dbc2 --- /dev/null +++ b/starter-batch/README.md @@ -0,0 +1,13 @@ +# starter-batch-未完成 + +- [spring-batch 官网](https://spring.io/projects/spring-batch#overview) +- [spring-batch-samples 官方示例](https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples) +- [spring-batch 参考示例](https://blog.csdn.net/qq_37556726/article/details/96028675) + +> Spring Batch是一个开源的、全面的、轻量级的批处理框架,通过Spring Batch可以实现强大的批处理应用程序的开发。 +> Spring Batch还提供记录/跟踪、事务管理、作业处理统计、作业重启以及资源管理等功能。 +> Spring Batch结合定时任务可以发挥更大的作用。 +> Spring Batch提供了ItemReader、ItemProcessor和ItemWriter来完成数据的读取、处理以及写出操作,并且可以将批处理的执行状态持久化到数据库中。 + + + diff --git a/starter-batch/pom.xml b/starter-batch/pom.xml new file mode 100644 index 0000000..ac99217 --- /dev/null +++ b/starter-batch/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + com.zja + spring-boot-starter-test-root + 2.0-SNAPSHOT + + + com.zja + starter-batch + jar + + + + org.springframework.boot + spring-boot-starter-batch + + + + mysql + mysql-connector-java + 8.0.20 + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-test + test + + + io.springfox + springfox-boot-starter + 3.0.0 + + + org.projectlombok + lombok + + + + org.springframework.batch + spring-batch-test + test + + + + diff --git a/starter-batch/src/main/java/com/zja/BatchApplication.java b/starter-batch/src/main/java/com/zja/BatchApplication.java new file mode 100644 index 0000000..7dcb167 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/BatchApplication.java @@ -0,0 +1,28 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-03-11 9:51 + * @Since: + */ +package com.zja; + +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * http://localhost:8080/swagger-ui/index.html#/ + * + * 要实现多Job的情况,需要把EnableBatchProcessing注解的modular设置为true,让每个Job使用自己的ApplicationConext + */ +@SpringBootApplication +@EnableBatchProcessing(modular = false) // 开启Spring Batch支持 +public class BatchApplication { + + public static void main(String[] args) { + SpringApplication.run(BatchApplication.class, args); + } + +} diff --git a/starter-batch/src/main/java/com/zja/config/OneBatchConfig.java b/starter-batch/src/main/java/com/zja/config/OneBatchConfig.java new file mode 100644 index 0000000..84b0856 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/config/OneBatchConfig.java @@ -0,0 +1,166 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-24 16:50 + * @Since: + */ +package com.zja.config; + +import com.zja.dto.OneDTO; +import com.zja.entity.OneEntity; +import com.zja.listeners.OneJobExecutionListener; +import com.zja.listeners.OneReadListener; +import com.zja.listeners.OneWriteListener; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.launch.support.RunIdIncrementer; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.database.builder.JpaItemWriterBuilder; +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; +import org.springframework.batch.item.file.mapping.DefaultLineMapper; +import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.ClassPathResource; +import org.springframework.transaction.PlatformTransactionManager; + +import javax.annotation.Resource; +import javax.persistence.EntityManagerFactory; +import javax.sql.DataSource; + +@Configuration +@EnableBatchProcessing +public class OneBatchConfig extends DefaultBatchConfigurer { + + @Autowired + JobBuilderFactory jobBuilderFactory; + @Autowired + StepBuilderFactory stepBuilderFactory; + @Autowired + DataSource dataSource; + + @Resource + private PlatformTransactionManager transactionManager; + @Resource + private EntityManagerFactory entityManagerFactory; + +/* + @Bean + @Primary + public JpaTransactionManager jpaTransactionManager() { + final JpaTransactionManager tm = new JpaTransactionManager(); + tm.setDataSource(dataSource); + return tm; + }*/ + + /** + * 任务启动器 + */ + /* @Override + protected JobLauncher createJobLauncher() throws Exception { + SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); + jobLauncher.setJobRepository(createJobRepository()); + jobLauncher.afterPropertiesSet(); + return jobLauncher; + }*/ + + /** + * 任务存储 + */ + /* @Override + protected JobRepository createJobRepository() throws Exception { + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + factory.setDatabaseType("onesql"); + factory.setTransactionManager(transactionManager); + factory.setDataSource(dataSource); + factory.afterPropertiesSet(); + return factory.getObject(); + } +*/ + + /** + * 定义Job + */ + @Bean("oneJob") + public Job job(JobBuilderFactory builder, @Qualifier("oneStep") Step step) { + return builder.get("oneJob") + .incrementer(new RunIdIncrementer()) + .flow(step) + .end() + .listener(new OneJobExecutionListener()) + .build(); + } + + @Bean("oneItemReader") + public ItemReader reader() { + FlatFileItemReader reader = new FlatFileItemReader<>(); + reader.setResource(new ClassPathResource("csv/one.csv")); + reader.setLinesToSkip(1); + reader.setLineMapper(new DefaultLineMapper() { + { + setLineTokenizer(new DelimitedLineTokenizer(",") { + { + setNames("uid", "name"); + } + }); + setFieldSetMapper(new BeanWrapperFieldSetMapper() { + { + setTargetType(OneDTO.class); + } + }); + } + }); + return reader; + } + + @Bean("oneItemProcessor") + public ItemProcessor processor() { + return new ItemProcessor() { + @Override + public OneEntity process(OneDTO item) throws Exception { + OneEntity p = new OneEntity(); + p.setUid(item.getUid()); + p.setName(item.getName()); + return p; + } + }; + } + + @Bean("oneItemWriter") + public ItemWriter itemWriter() { + JpaItemWriterBuilder builder = new JpaItemWriterBuilder<>(); + builder.entityManagerFactory(entityManagerFactory); + return builder.build(); + } + + @Bean("oneStep") + public Step step(StepBuilderFactory stepBuilderFactory, + @Qualifier("oneItemReader") ItemReader reader, + @Qualifier("oneItemWriter") ItemWriter writer, + @Qualifier("oneItemProcessor") ItemProcessor processor) { + return stepBuilderFactory + .get("oneStep") + .chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作) +// .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2) + .reader(reader) + .listener(new OneReadListener()) + .processor(processor) +// .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2) + .writer(writer) + .listener(new OneWriteListener()) + .transactionManager(transactionManager) + .build(); + } + +} diff --git a/starter-batch/src/main/java/com/zja/config/Swagger3Config.java b/starter-batch/src/main/java/com/zja/config/Swagger3Config.java new file mode 100644 index 0000000..328255f --- /dev/null +++ b/starter-batch/src/main/java/com/zja/config/Swagger3Config.java @@ -0,0 +1,40 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2021-10-22 15:17 + * @Since: + */ +package com.zja.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import springfox.documentation.builders.ApiInfoBuilder; +import springfox.documentation.builders.PathSelectors; +import springfox.documentation.builders.RequestHandlerSelectors; +import springfox.documentation.oas.annotations.EnableOpenApi; +import springfox.documentation.service.ApiInfo; +import springfox.documentation.service.Contact; +import springfox.documentation.spi.DocumentationType; +import springfox.documentation.spring.web.plugins.Docket; + +/** + * http://localhost:prot/swagger-ui/index.html#/ + */ +@EnableOpenApi +@Configuration +public class Swagger3Config { + + @Bean + public Docket createRestApi() { + return new Docket(DocumentationType.OAS_30).apiInfo(apiInfo()).select() + .apis(RequestHandlerSelectors.basePackage("com.zja")).paths(PathSelectors.any()) + .build(); + } + + private ApiInfo apiInfo() { + return new ApiInfoBuilder().title("提供rest服务").description("我是描述").contact(new Contact("联系人", "www.baidu.com", "123@qq.com")) + .version("1.0").build(); + } +} diff --git a/starter-batch/src/main/java/com/zja/config/TwoBatchConfig.java b/starter-batch/src/main/java/com/zja/config/TwoBatchConfig.java new file mode 100644 index 0000000..6473f0c --- /dev/null +++ b/starter-batch/src/main/java/com/zja/config/TwoBatchConfig.java @@ -0,0 +1,117 @@ +///** +// * @Company: 上海数慧系统技术有限公司 +// * @Department: 数据中心 +// * @Author: 郑家骜[ào] +// * @Email: zhengja@dist.com.cn +// * @Date: 2022-03-11 11:11 +// * @Since: +// */ +//package com.zja.config; +// +//import com.zja.entity.User; +//import org.springframework.batch.core.Job; +//import org.springframework.batch.core.Step; +//import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +//import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +//import org.springframework.batch.core.configuration.annotation.StepScope; +//import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; +//import org.springframework.batch.item.database.JdbcBatchItemWriter; +//import org.springframework.batch.item.file.FlatFileItemReader; +//import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; +//import org.springframework.batch.item.file.mapping.DefaultLineMapper; +//import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.context.annotation.Bean; +//import org.springframework.core.io.ClassPathResource; +// +//import javax.sql.DataSource; +// +////@Configuration +//public class UserBatchConfig { +// +// @Autowired +//// JobBuilderFactory jobBuilderFactory; +//// @Autowired +// StepBuilderFactory stepBuilderFactory; +// @Autowired +// DataSource dataSource; +// +// /** +// * Spring Batch提供了一些常用的ItemReader, +// * 例如JdbcPagingItemReader用来读取数据库中的数据, +// * StaxEventItemReader用来读取XML数据, +// * 本案例中的FlatFileItemReader则是一个加载普通文件的ItemReader +// */ +// @Bean +// @StepScope +// FlatFileItemReader itemReader() { +// FlatFileItemReader reader = new FlatFileItemReader<>(); +// reader.setLinesToSkip(1);//跳过一行 +// reader.setResource(new ClassPathResource("csv/user.csv"));//配置data.csv文件的位置 +// reader.setLineMapper( // 通过setLineMapper方法设置每一行的数据信息 +// new DefaultLineMapper() {{ +// setLineTokenizer(new DelimitedLineTokenizer() {{ +// setNames("id", "username", "address"); // setNames方法配置了data.csv文件一共有4列 +// setDelimiter(","); // setDelimiter则是配置列与列之间的间隔符 +// }}); +// // 设置要映射的实体类属性 +// setFieldSetMapper(new BeanWrapperFieldSetMapper() {{ +// setTargetType(User.class); +// }}); +// }}); +// return reader; +// } +// +// /** +// * Spring Batch也提供了多个ItemWriter的实现, +// * 常见的如FlatFileItemWriter,表示将数据写出为一个普通文件, +// * StaxEventItemWriter表示将数据写出为XML。 +// * 另外,还有针对不同数据库提供的写出操作支持类,如MongoItemWriter、JpaItemWriter、Neo4jItemWriter以及HibernateItemWriter等, +// * 本案例使用的JdbcBatchItemWriter则是通过JDBC将数据写出到一个关系型数据库中。 +// */ +// @Bean +// JdbcBatchItemWriter jdbcBatchItemWriter() { +// JdbcBatchItemWriter writer = new JdbcBatchItemWriter(); +// writer.setDataSource(dataSource); +// // JdbcBatchItemWriter主要配置数据以及数据插入SQL,注意占位符的写法是“:属性名” +// writer.setSql("insert into user(id,username,address) values(:id,:username,:address)"); +// // 通过BeanPropertyItemSqlParameterSourceProvider实例将实体类的属性和SQL中的占位符一一映射 +// writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); +// return writer; +// } +// +// /** +// * 配置一个Step +// */ +// @Bean +// Step csvStep() { +// /* +// * Step通过stepBuilderFactory进行配置,首先通过get获取一个StepBuilder, +// * get方法的参数就是该Step的name +// * 然后调用chunk方法的参数2,表示每读取到两条数据就执行一次write操作, +// * 最后分别配置reader和writer。 +// */ +// return stepBuilderFactory.get("csvStep") +// .chunk(2) +// .reader(itemReader()) +// .writer(jdbcBatchItemWriter()) +// .build(); +// } +// +// /** +// * 配置一个Job +// */ +// @Bean +// Job csvJob() { +// /* +// * 通过jobBuilderFactory构建一个Job, +// * get方法的参数为Job的name, +// * 然后配置该Job的Step即可 +// */ +// return jobBuilderFactory.get("csvJob") +// .start(csvStep()) +// .build(); +// } +// +// +//} diff --git a/starter-batch/src/main/java/com/zja/controller/PersonBatchController.java b/starter-batch/src/main/java/com/zja/controller/PersonBatchController.java new file mode 100644 index 0000000..d1910cb --- /dev/null +++ b/starter-batch/src/main/java/com/zja/controller/PersonBatchController.java @@ -0,0 +1,46 @@ +///** +// * @Company: 上海数慧系统技术有限公司 +// * @Department: 数据中心 +// * @Author: 郑家骜[ào] +// * @Email: zhengja@dist.com.cn +// * @Date: 2022-11-25 13:47 +// * @Since: +// */ +//package com.zja.controller; +// +//import org.springframework.http.ResponseEntity; +//import org.springframework.web.bind.annotation.GetMapping; +//import org.springframework.web.bind.annotation.RequestMapping; +//import org.springframework.web.bind.annotation.RestController; +// +//@RestController +//@RequestMapping("/person") +//public class PersonBatchController { +// +// /* @Resource +// @Qualifier("myJob") +// private Job job; +// @Resource +// private JobLauncher launcher; +//*/ +// /** +// * 启动一个批处理 +// * http://127.0.0.1:8080/person/batch +// */ +// @GetMapping("/batch") +// public ResponseEntity queryById() { +///* +// JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); +// try { +// launcher.run(job, jobParameters); +// } catch (JobExecutionAlreadyRunningException +// | JobRestartException +// | JobInstanceAlreadyCompleteException +// | JobParametersInvalidException e) { +// e.printStackTrace(); +// }*/ +// +// return ResponseEntity.ok("成功"); +// } +// +//} diff --git a/starter-batch/src/main/java/com/zja/controller/UserBatchController.java b/starter-batch/src/main/java/com/zja/controller/UserBatchController.java new file mode 100644 index 0000000..a303a52 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/controller/UserBatchController.java @@ -0,0 +1,40 @@ +///** +// * @Company: 上海数慧系统技术有限公司 +// * @Department: 数据中心 +// * @Author: 郑家骜[ào] +// * @Email: zhengja@dist.com.cn +// * @Date: 2022-03-11 11:12 +// * @Since: +// */ +//package com.zja.controller; +// +//import org.springframework.web.bind.annotation.GetMapping; +//import org.springframework.web.bind.annotation.RequestMapping; +//import org.springframework.web.bind.annotation.RestController; +// +//@RestController +//@RequestMapping("/batch") +//public class UserBatchController { +// +// /* @Autowired +// JobLauncher jobLauncher; +// @Autowired +// Job job;*/ +// +// /** +// * 启动一个批处理 +// * http://127.0.0.1:8080/batch/csv +// */ +// @GetMapping("/csv") +// public String hello() { +// try { +// //JobLauncher由框架提供,Job则是刚刚配置的,通过调用JobLauncher中的run方法启动一个批处理 +//// jobLauncher.run(job, new JobParameters()); +// } catch (Exception e) { +// e.printStackTrace(); +// return "执行任务失败."; +// } +// return "执行任务成功."; +// } +// +//} diff --git a/starter-batch/src/main/java/com/zja/csv/CsvBeanValidator.java b/starter-batch/src/main/java/com/zja/csv/CsvBeanValidator.java new file mode 100644 index 0000000..6937519 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/csv/CsvBeanValidator.java @@ -0,0 +1,24 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-24 17:16 + * @Since: + */ +package com.zja.csv; + +/*public class CsvBeanValidator implements Validator, InitializingBean { + + //使用Validator的validate方法校验数据。 + @Override + public void validate(Object o) throws ValidationException { + + } + + //使用JSR-303的Validator来校验我们的数据,在此处进行JSR-303的Validator的初始化。 + @Override + public void afterPropertiesSet() throws Exception { + + } +}*/ diff --git a/starter-batch/src/main/java/com/zja/csv/CsvJobListener.java b/starter-batch/src/main/java/com/zja/csv/CsvJobListener.java new file mode 100644 index 0000000..f318b7b --- /dev/null +++ b/starter-batch/src/main/java/com/zja/csv/CsvJobListener.java @@ -0,0 +1,31 @@ +///** +// * @Company: 上海数慧系统技术有限公司 +// * @Department: 数据中心 +// * @Author: 郑家骜[ào] +// * @Email: zhengja@dist.com.cn +// * @Date: 2022-11-24 17:16 +// * @Since: +// */ +//package com.zja.csv; +// +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.batch.core.JobExecution; +//import org.springframework.batch.core.JobExecutionListener; +// +//@Slf4j +//public class CsvJobListener implements JobExecutionListener { +// +// private long startTime; +// +// @Override +// public void beforeJob(JobExecution jobExecution) { +// startTime = System.currentTimeMillis(); +// log.info("任务处理开始..."); +// } +// +// @Override +// public void afterJob(JobExecution jobExecution) { +// long endTime = System.currentTimeMillis(); +// log.info("任务处理结束,耗时:{} ms", (endTime - startTime)); +// } +//} diff --git a/starter-batch/src/main/java/com/zja/csv/CsvLineMapper.java b/starter-batch/src/main/java/com/zja/csv/CsvLineMapper.java new file mode 100644 index 0000000..12f3a25 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/csv/CsvLineMapper.java @@ -0,0 +1,21 @@ +///** +// * @Company: 上海数慧系统技术有限公司 +// * @Department: 数据中心 +// * @Author: 郑家骜[ào] +// * @Email: zhengja@dist.com.cn +// * @Date: 2022-11-24 17:16 +// * @Since: +// */ +//package com.zja.csv; +// +//import com.zja.entity.User; +//import org.springframework.batch.item.file.LineMapper; +// +//public class CsvLineMapper implements LineMapper { +// +// @Override +// public Object mapLine(String line, int lineNumber) throws Exception { +// String[] lines = line.split(","); +// return new User(lines[0], lines[1], lines[2]); +// } +//} diff --git a/starter-batch/src/main/java/com/zja/csv/CsvValidatingItemProcessor.java b/starter-batch/src/main/java/com/zja/csv/CsvValidatingItemProcessor.java new file mode 100644 index 0000000..f5962c0 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/csv/CsvValidatingItemProcessor.java @@ -0,0 +1,30 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-24 17:35 + * @Since: + */ +package com.zja.csv; + +/* +public class CsvValidatingItemProcessor extends ValidatingItemProcessor { + + @Override + public User process(User item) throws ValidationException { + + // 需执行super.process (item) 才会调用自定义校验器 + super.process(item); + // 对数据做简单的处理 将性别装换为中文 + */ +/*if (item.getSex().equals("1")) { + item.setSex("男"); + } else { + item.setSex("女"); + }*//* + + return item; + } +} +*/ diff --git a/starter-batch/src/main/java/com/zja/dto/OneDTO.java b/starter-batch/src/main/java/com/zja/dto/OneDTO.java new file mode 100644 index 0000000..0370623 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/dto/OneDTO.java @@ -0,0 +1,20 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-28 13:20 + * @Since: + */ +package com.zja.dto; + +import lombok.Data; + +import java.util.Date; + +@Data +public class OneDTO { + private String uid; + private String name; + private Date createTime; +} diff --git a/starter-batch/src/main/java/com/zja/dto/TwoDTO.java b/starter-batch/src/main/java/com/zja/dto/TwoDTO.java new file mode 100644 index 0000000..5719fc7 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/dto/TwoDTO.java @@ -0,0 +1,20 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-28 13:20 + * @Since: + */ +package com.zja.dto; + +import lombok.Data; + +import java.util.Date; + +@Data +public class TwoDTO { + private String uid; + private String name; + private Date createTime; +} diff --git a/starter-batch/src/main/java/com/zja/entity/OneEntity.java b/starter-batch/src/main/java/com/zja/entity/OneEntity.java new file mode 100644 index 0000000..f570e9d --- /dev/null +++ b/starter-batch/src/main/java/com/zja/entity/OneEntity.java @@ -0,0 +1,30 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-02-14 11:03 + * @Since: + */ +package com.zja.entity; + +import lombok.Data; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.Table; +import java.util.Date; + +@Data +@Entity +@Table(name = "one_entity") +public class OneEntity { + + @Id + @GeneratedValue + private Long id; + private String uid; + private String name; + private Date createTime; +} diff --git a/starter-batch/src/main/java/com/zja/entity/TwoEntity.java b/starter-batch/src/main/java/com/zja/entity/TwoEntity.java new file mode 100644 index 0000000..7476eb9 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/entity/TwoEntity.java @@ -0,0 +1,30 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-02-14 11:03 + * @Since: + */ +package com.zja.entity; + +import lombok.Data; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.Id; +import javax.persistence.Table; +import java.util.Date; + +@Data +@Entity +@Table(name = "two_entity") +public class TwoEntity { + + @Id + @GeneratedValue + private Long id; + private String uid; + private String name; + private Date createTime; +} diff --git a/starter-batch/src/main/java/com/zja/listeners/OneJobExecutionListener.java b/starter-batch/src/main/java/com/zja/listeners/OneJobExecutionListener.java new file mode 100644 index 0000000..aaca3ae --- /dev/null +++ b/starter-batch/src/main/java/com/zja/listeners/OneJobExecutionListener.java @@ -0,0 +1,33 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-25 13:55 + * @Since: + */ +package com.zja.listeners; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; + +public class OneJobExecutionListener implements JobExecutionListener { + + private Logger logger = LoggerFactory.getLogger(OneJobExecutionListener.class); + + private long startTime; + + @Override + public void beforeJob(JobExecution jobExecution) { + startTime = System.currentTimeMillis(); + logger.info("beforeJob 任务处理开始..."); + } + + @Override + public void afterJob(JobExecution jobExecution) { + long endTime = System.currentTimeMillis(); + logger.info("afterJob 任务处理结束,耗时:{} ms", (endTime - startTime)); + } +} diff --git a/starter-batch/src/main/java/com/zja/listeners/OneReadListener.java b/starter-batch/src/main/java/com/zja/listeners/OneReadListener.java new file mode 100644 index 0000000..334d911 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/listeners/OneReadListener.java @@ -0,0 +1,35 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-25 13:44 + * @Since: + */ +package com.zja.listeners; + +import com.zja.dto.OneDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ItemReadListener; + +public class OneReadListener implements ItemReadListener { + + private Logger logger = LoggerFactory.getLogger(OneReadListener.class); + + @Override + public void beforeRead() { + logger.info("beforeRead"); + } + + @Override + public void afterRead(OneDTO item) { + logger.info("afterRead:{}", item); + } + + @Override + public void onReadError(Exception ex) { + System.out.println("onReadError"); + logger.error("读取数据错误:{}", ex); + } +} diff --git a/starter-batch/src/main/java/com/zja/listeners/OneWriteListener.java b/starter-batch/src/main/java/com/zja/listeners/OneWriteListener.java new file mode 100644 index 0000000..da8e6b1 --- /dev/null +++ b/starter-batch/src/main/java/com/zja/listeners/OneWriteListener.java @@ -0,0 +1,45 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-25 13:45 + * @Since: + */ +package com.zja.listeners; + +import com.zja.entity.OneEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ItemWriteListener; + +import java.util.List; + +import static java.lang.String.format; + +public class OneWriteListener implements ItemWriteListener { + + private Logger logger = LoggerFactory.getLogger(OneWriteListener.class); + + @Override + public void beforeWrite(List items) { + logger.info("beforeWrite:{}", items); + } + + @Override + public void afterWrite(List items) { + logger.info("afterWrite:{}", items); + } + + @Override + public void onWriteError(Exception exception, List items) { + try { + logger.error("onWriteError:{}", exception.getMessage()); + for (OneEntity item : items) { + logger.info(format("Failed writing BlogInfo : %s", item.toString())); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/starter-batch/src/main/resources/application.yaml b/starter-batch/src/main/resources/application.yaml new file mode 100644 index 0000000..1ef7c26 --- /dev/null +++ b/starter-batch/src/main/resources/application.yaml @@ -0,0 +1,22 @@ + +spring: + datasource: + url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC + username: test + password: pass + driver-class-name: com.mysql.cj.jdbc.Driver +# type: com.alibaba.druid.pool.DruidDataSource # 默 com.zaxxer.hikari.HikariDataSource 数据库连接池类型 + jpa: + show-sql: true # 控制台打印sql + hibernate: + naming: + physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl + ddl-auto: update + batch: + job: + enabled: false # 项目启动完成是否自动执行定义的Job 默 true +# names: job1,job2 # 启动时要执行的Job,默认执行全部Job + jdbc: + initialize-schema: always # 项目启动时执行建表SQL 默 embedded + table-prefix: batch_ # 表的前缀 默 batch_ +# schema: # 初始化执行的*.sql文件位置 默 classpath:org/springframework/batch/core/schema-@@platform@@.sql diff --git a/starter-batch/src/main/resources/csv/one.csv b/starter-batch/src/main/resources/csv/one.csv new file mode 100644 index 0000000..f7822be --- /dev/null +++ b/starter-batch/src/main/resources/csv/one.csv @@ -0,0 +1,6 @@ +uid,name +1,a +2,b +3,c +4,d +5,f diff --git a/starter-batch/src/main/resources/csv/user.csv b/starter-batch/src/main/resources/csv/user.csv new file mode 100644 index 0000000..1b3cde0 --- /dev/null +++ b/starter-batch/src/main/resources/csv/user.csv @@ -0,0 +1,5 @@ +id,username,address +1,李四,上海市 +2,张三,北京市 +3,万二,天津市 +4,一名,杭州市 diff --git a/starter-batch/src/test/java/com/zja/BatchApplicationTests.java b/starter-batch/src/test/java/com/zja/BatchApplicationTests.java new file mode 100644 index 0000000..7c7f0d1 --- /dev/null +++ b/starter-batch/src/test/java/com/zja/BatchApplicationTests.java @@ -0,0 +1,47 @@ +/** + * @Company: 上海数慧系统技术有限公司 + * @Department: 数据中心 + * @Author: 郑家骜[ào] + * @Email: zhengja@dist.com.cn + * @Date: 2022-11-25 14:02 + * @Since: + */ +package com.zja; + +import org.junit.jupiter.api.Test; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.JobParametersInvalidException; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; + +import javax.annotation.Resource; + +@SpringBootTest +public class BatchApplicationTests { + + @Resource + @Qualifier("oneJob") + private Job job; + + @Resource + private JobLauncher launcher; + + @Test + public void oneJob_batch_test() { + JobParameters jobParameters = new JobParametersBuilder().toJobParameters(); + try { + launcher.run(job, jobParameters); + } catch (JobExecutionAlreadyRunningException + | JobRestartException + | JobInstanceAlreadyCompleteException + | JobParametersInvalidException e) { + e.printStackTrace(); + } + } +}