介绍
Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。
Spring Batch 使用内存缓冲机制,将读取的数据记录暂存于内存中,然后批量处理这些数据。通过减少对磁盘或数据库的频繁访问,内存缓冲可以提高读取和处理的效率,而且Spring Batch 提供了批量读取的机制,允许一次性读取和处理多个数据记录,这两点都减轻 I/O 压力。
业务方案
企业消息驱动处理;
自动化地处理大批量复杂的数据,如月结计算;
重复性地处理大批量数据,如费率计算;
消息驱动处理,充当内部系统和外部系统的数据纽带;
中间需要对数据进行格式化,校验,转换处理等;
那么还能用到哪些业务场景上呢?
架构图
任务启动器 Job Launcher 负责运行Job,任务存储仓库Job Repository存储着Job的执行状态,参数和日志等信息。Job处理任务又可以分为三大类:
数据读取 Item Reader
数据中间处理 Item Processor
数据输出 Item Writer。
JobLauncher
任务启动器 Job Launcher 负责运行Job
Job
Spring Batch里最基本的单元就是任务Job,一个Job由若干个步骤Step组成。是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个Job
所定义的内容。
Job相关概念如下:
Job:封装处理实体,定义过程逻辑。
JobInstance:Job的运行实例,不同的实例,参数不同,所以定义好一个Job后可以通过不同参数运行多次。
JobParameters:与JobInstance相关联的参数。
JobExecution:代表Job的一次实际执行,可能成功、可能失败。
Step
Step
是对Job
某个过程的封装,一个Job
可以包含一个或多个Step
,一步步的Step
按特定逻辑执行,才代表Job
执行完成 。
定义一个Job关键是定义好一个或多个Step,然后把它们组装好即可。而定义Step有多种方法,但有一种常用的模型就是输入——处理——输出,即Item Reader、Item Processor和Item Writer。比如通过Item Reader从文件输入数据,然后通过Item Processor进行业务处理和数据转换,最后通过Item Writer写到数据库中去。
Item Reader
Item Processor
Item Write
JobRepository
对整个Job的新增、更新、执行进行记录,存储着Job的执行状态,参数和日志等信息。
准备
创建一个 Springboot 项目
这里我就不多讲了
添加依赖
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-jdbc'
implementation 'mysql:mysql-connector-java:8.0.33'
implementation 'com.baomidou:mybatis-plus-boot-starter:3.5.4'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.batch:spring-batch-test'
}
数据库初始化
任务存储仓库支持多种结构化和非结构化数据库。
可以选择通过yaml配置进行初始化
spring:
batch:
jdbc:
# 设置数据库表前缀
table-prefix: test_
# 初始化schema
initialize-schema: always
platform: mysql
# 事务等级
isolation-level-for-create: default
也可以自己选择执行:
我这里使用mysql,所以把 schema-mysql.sql 导入到数据库中。导入后,库表如下图所示:
application.yml 配置
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/springbatch
username: test
password: 123456
batch:
job:
# 启动服务的时候是否启动JOB
enabled: false
# 指定需要启动的JOB
#names:
# - ""
jdbc:
# 设置数据库表前缀
table-prefix: test_
# 初始化schema
initialize-schema: always
platform: mysql
# 事务等级
isolation-level-for-create: default
@EnableBatchProcessing
接着在Spring Boot的入口类上添加@EnableBatchProcessing
注解,表示开启Spring Batch批处理功能:
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
至此,基本框架搭建好了,下面开始进行一些测试吧。
测试DEMO
一、入门实践
简单任务
多步骤任务
Flow任务
并行执行
任务决策器
任务嵌套
二、读取数据
读取实现类参考:https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/appendix.html#itemReadersAppendix
简单数据读取
文本数据读取
数据库数据读取
XML数据读取
JSON数据读取
多文本数据读取
三、输出数据
输出实现类参考:https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/appendix.html#itemWritersAppendix
输出文本数据
输出XML数据
输出JSON数据
输出数据到数据库
多文本输出
四、处理数据
格式校验
数据过滤
数据转换
聚合处理
五、监听器
每种监听器都可以通过两种方式使用:
接口实现;
注解驱动。
使用注解标注的方法名必须和注解一致
六、异常处理
异常重试
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.processor(myProcessor())
.writer(list -> list.forEach(System.out::println))
// 配置错误容忍
.faultTolerant()
// 配置重试的异常类型
.retry(MyJobExecutionException.class)
// 重试3次,三次过后还是异常的话,则任务会结束,
.retryLimit(3)
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常重试
.build();
}
异常跳过
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.processor(myProcessor())
.writer(list -> list.forEach(System.out::println))
// 配置错误容忍
.faultTolerant()
// 配置跳过的异常类型
.skip(MyJobExecutionException.class)
// 最多跳过1次,1次过后还是异常的话,则任务会结束,
.skipLimit(1)
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常跳过
.build();
}
异常监听
@Component
public class MySkipListener implements SkipListener<String, String> {
@Override
public void onSkipInRead(Throwable t) {
System.out.println("在读取数据的时候遇到异常并跳过,异常:" + t.getMessage());
}
@Override
public void onSkipInWrite(String item, Throwable t) {
System.out.println("在输出数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
}
@Override
public void onSkipInProcess(String item, Throwable t) {
System.out.println("在处理数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
}
}
@Autowired
private MySkipListener mySkipListener;
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.processor(myProcessor())
.writer(list -> list.forEach(System.out::println))
.faultTolerant() // 配置错误容忍
// 配置跳过的异常类型
.skip(MyJobExecutionException.class)
// 最多跳过1次,1次过后还是异常的话,则任务会结束
.skipLimit(1)
// 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常跳过
.listener(mySkipListener)
.build();
}
事务问题
一次Step分为Reader、Processor和Writer三个阶段,这些阶段统称为Item。默认情况下如果错误不是发生在Reader阶段,那么没必要再去重新读取一次数据。但是某些场景下需要Reader部分也需要重新执行,比如Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,因此也要将Reader纳入到回滚的事物中,根据这个场景可以使用readerIsTransactionalQueue()
来配置数据重读:
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
// 消息队列数据重读
.readerIsTransactionalQueue()
.build();
}
我们还可以在Step中手动配置事务属性,事物的属性包括隔离等级(isolation)、传播方式(propagation)以及过期时间(timeout)等:
private Step step() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
.transactionAttribute(attribute)
.build();
}
重启机制
默认情况下,任务执行完毕的状态为COMPLETED
,再次启动项目,该任务的Step不会再执行,我们可以通过配置allowStartIfComplete(true)
来实现每次项目重新启动都将执行这个Step:
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
.allowStartIfComplete(true)
.build();
}
某些Step可能用于处理一些先决的任务,所以当Job再次重启时这Step就没必要再执行,可以通过设置startLimit()
来限定某个Step重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将不再执行:
private Step step() {
return stepBuilderFactory.get("step")
.<String, String>chunk(2)
.reader(listItemReader())
.writer(list -> list.forEach(System.out::println))
.startLimit(1)
.build();
}
关闭重启机制
要关闭Spring Batch启动项目自动运行任务的机制,需要在项目配置文件application.yml中添加如下配置:
spring:
batch:
job:
enabled: false
七、API调度
JobLauncher 任务创建启动
JobOperator 任务操作
评论区