SpringBatch 批处理框架

SpringBatch

使用场景

一个典型的批处理程序:

  • 从数据库、文件或队列中读取大量记录。
  • 以某种方式处理数据。
  • 以修改后的形式写回数据。

Spring Batch 自动化了这个基本的批处理迭代,提供了将类似事务作为一组处理的能力,通常是在离线环境中,无需任何用户交互。

业务场景

  • 定期提交批处理
  • 并发批处理:一个作业的并行处理
  • 分阶段的企业消息驱动处理
  • 大规模并行批处理
  • 失败后手动或计划重启
  • 相关步骤的顺序处理(扩展工作流驱动的批处理)
  • 部分处理:跳过记录(例如,在回滚时)
  • 整批事务,适用于小批量或现有存储过程/脚本的情况

SpringBatch架构

一个典型的批处理应用程序大致如下:

  • 从数据库,文件或队列中读取大量记录。
  • 以某种方式处理数据。
  • 以修改之后的形式写回数据。

其对应的示意图如下:

1

spring batch的总体架构如下:

图 1.1:Spring Batch 分层架构

Application 包含开发人员使用 Spring Batch 编写的所有批处理作业和自定义代码。

Batch Core 包含启动和控制批处理作业所需的核心运行时类。它包括 JobLauncherJob和的实现Step

应用程序和核心都建立在一个通用的 Batch Infrastructure 之上。此基础架构包含常见的读取器和写入器(ItemReaderand ItemWriter)以及服务(如RetryTemplate

Spring Batch 核心概念

图 2.1:批次定型

上图突出了构成 Spring Batch Domain Language的 关键概念。一个 Job 有多个Step(一对多),每个Step正好有一个ItemReader、一个ItemProcessor和一个ItemWriterJob通常使用 JobLauncher来启动,并且需要存储有关当前运行进程的元数据在JobRepository中。

简单来说,ItemProcesseor 用于处理数据,ItemWriter 用于写数据,而每一个定义的job 则都在 JobRepository 里面,我们可以通过 JobLauncher 来启动某一个 job。

Job

Job是封装整个批处理过程的实体。与其他 Spring 项目一样,Job与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以称为“作业配置”。 Job 在 Spring Batch 的体系当中只是一个最顶层的一个抽象概念,体现在代码当中则它只是一个最上层的接口。如下图所示:

工作层次结构

在 Spring Batch 当中,job 是最顶层的抽象,除 job 之外我们还有 JobInstance 以及 JobExecution 这两个更加底层的抽象。

一个 job 是我们运行的基本单位,它内部由 step 组成。job 本质上可以看成 step 的一个容器。一个 job 可以按照指定的逻辑顺序组合 step,并提供了我们给所有 step 设置相同属性的方法,例如一些事件监听,跳过策略。

使用 java config 的例子代码如下:

  • Job的名称。
  • Step实例的定义和排序。
  • Job是否可重新启动。
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(step1())
                     .next(step2())
                     .next(step3())
                     .build();
}
JobInstance

JobInstance是指Job运行的概念,类似于Job的每个运行实例,是Job的更加底层的一个抽象。

他的定义如下:

public interface JobInstance {
 /**
  * Get unique id for this JobInstance.
  * @return instance id
  */
 public long getInstanceId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
}

他的方法很简单,一个是返回 Job 的 id,另一个是返回 Job 的名字。

例如上图中的“EndOfDay”,他是一个应该在一天结束时运行一次的批处理作业。从定义上来看"EndOfDay" 是一个Job,但在每天运行"EndOfDay"这个作业时,必须单独跟踪每天独立的JobInstance。例如,有 1 月 1 日运行、1 月 2 日运行等。如果 1 月 1 日运行第一次失败并在第二天再次运行,则仍然是 1 月 1 日运行。

因此,每个JobInstance都可以有多个执行,即多个JobExecution,并且只有一个JobInstance对应一个特定的Job(EndOfDay)和JobParameters(1 月 1 日运行)时才可以在给定的时间运行。

JobParameters

“一个JobInstance与另一个JobInstance有何区别?” 答案是: JobParameters

一个JobParameters对象包含一组用于启动批处理作业的参数。它们可用于识别,甚至可在运行期间用作参考数据,如下图所示:

作业参数

在前面的示例中,有两个JobInstance,一个用于 1 月 1 日,另一个用于 1 月 2 日。只有一个Job,但它有两个JobParameter对象:一个以作业参数 2022-01-01 开始,另一个以 2022-01-02 的参数开始。因此可以定义:JobInstance= Job + JobParameters

图片

JobExecution

JobExecution指的是单次尝试运行 Job ,执行可能以失败或成功结束。但JobInstance除非执行成功完成,否则与给定执行对应的执行不被视为完成。

以前面描述的 EndOfDayJob为例,如果 2022-01-01 的JobInstance第一次运行失败,我们使用与第一次运行相同的 JobParameters (2022-01-01) 再次运行它,此时会创建一个新的JobExecution,但是仍然只有一个JobInstance

Step

Step是一个域对象,它封装了批处理作业的独立、顺序阶段,每个 Job 由一个或多个step组成。

Figure 2.1: Job Hierarchy With Steps

StepExecution

StepExecution表示执行一个step。一个Step 每次运行时都会创建一个新的StepExecution,类似于JobExecution,但是一个Step如果因为前面其他的Step失败而无法执行,则不会产生StepExecution。即StepExecution仅在Step实际启动时创建。

此外,每个StepExecution包含一个ExecutionContext,其中包含开发人员需要在批处理运行中持久保存的任何数据,例如重新启动所需的统计信息或状态信息。下表列出了StepExecution的属性:

属性定义
StatusBatchStatus执行状态的对象。运行时,状态为BatchStatus.STARTED。如果失败,则状态为BatchStatus.FAILED。如果成功完成,状态为BatchStatus.COMPLETED
startTime表示开始执行时的当前系统时间(java.util.Date)。如果尚未开始,则此字段为空。
endTime表示执行完成时的当前系统时间(java.util.Date),无论是否成功。如果尚未退出,则此字段为空。
exitStatusExitStatus表示执行结果,它包含一个返回给调用者的退出代码。如果尚未退出,则此字段为空。
executionContext包含需要在执行之间持久化的任何用户数据的“属性包”。
readCount已成功读取的项目数。
writeCount已成功写入的项目数。
commitCount已为本次执行提交的事务数。
rollbackCountStep控制的业务事务被回滚的次数。
readSkipCountread失败导致跳过的次数。
processSkipCountprocess失败导致跳过的次数。
filterCount已被ItemProcessor过滤的项目数。
writeSkipCountwrite失败导致跳过的次数。
ExecutionContext

ExecutionContext 即每一个 StepExecution 的执行环境,是由框架持久化和控制的键/值对的集合,以便开发人员可以在一个地方存储范围为StepExecution对象或JobExecution对象的持久状态。

以上面的 EndOfDay 为例,假设有一个步骤“loadData”将文件加载到数据库中。在第一次运行失败后,元数据表将类似于以下示例:

Table——BATCH_JOB_INSTANCE

JOB_INST_IDJOB_NAME
1EndOfDayJob

Table——BATCH_JOB_EXECUTION_PARAMS

JOB_INST_IDTYPE_CDKEY_NAMEDATE_VAL
1DATEschedule.Date2022-01-01

Table——BATCH_JOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112022-01-01 21:002022-01-01 21:30FAILED

Table——BATCH_STEP_EXECUTION

STEP_EXEC_IDJOB_EXEC_IDSTEP_NAMESTART_TIMEEND_TIMESTATUS
11loadData2022-01-01 21:002022-01-01 21:30FAILED

Table——BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_IDSHORT_CONTEXT
1{piece.count=40321}

假设Job第二天重新启动,它可以检查是否在上下文中存储了上次的运行状态以此来初始化自己,如下面的示例所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}
JobRepository

JobRepository 是一个用于将 job,step 等概念进行持久化的一个类。它同时给 Job 和 Step 以及下文会提到的 JobLauncher 实现提供 CRUD 操作。

当一个 Job首次启动时,从repository中获取一个JobExecution ,并且在执行过程中,StepExecutionJobExecution将被存储到repository

@EnableBatchProcessing 注解可以为 JobRepository 提供自动配置。

JobLauncher

JobLauncher这个接口用于启动一个指定了JobParametersJob。如下所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

上面 run 方法根据传入的 job 以及 jobparamaters JobRepository 获取一个JobExecution并执行 Job

Item Reader

ItemReader是一种抽象,表示检索一个Step的一次输入。当ItemReader通过返回null来表示已经读完所有数据。

Spring Batch 为ItemReader提供了非常多的有用的实现类,比如 JdbcPagingItemReaderJdbcCursorItemReader等等。

ItemReader 支持读入的数据源也非常丰富,包括各种类型的数据库,文件,数据流,等等。几乎涵盖了我们的所有场景。

下面是一个 JdbcPagingItemReader 的例子代码:

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
               .name("creditReader")
               .dataSource(dataSource)
               .queryProvider(queryProvider)
               .parameterValues(parameterValues)
               .rowMapper(customerCreditMapper())
               .pageSize(1000)
               .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
}

JdbcPagingItemReader 必须指定一个 PagingQueryProvider,负责提供 SQL 查询语句来按分页返回数据。

下面是一个 JdbcCursorItemReader 的例子代码:

private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
            String tenant) {

        JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(dataSource);
        itemReader.setSql("sql here");
        itemReader.setRowMapper(new RowMapper());
        return itemReader;
    }
Item Writer

ItemWriter 自然就是一个写数据的抽象,它是为每一个 step 提供数据写出的功能。

写的单位是可以配置的,可以一次写一条数据,也可以一次写一个 chunk 的数据。ItemWriter 对于读入的数据是不能做任何操作的。

Item Processor

ItemProcessor是表示项目的业务逻辑处理的抽象。当ItemReader读取一个项目并通过ItemWriter写入时,ItemProcessor提供了一个访问点来转换或应用其他业务处理。

即我们可以在ItemWriter还未写入这条记录之前,借助ItemProcessor提供的处理业务逻辑的功能,对数据进行相应操作。如果我们在 ItemProcessor 发现一条数据不应该被写入,可以通过返回 null 来表示。

chunk 处理流程

Spring Batch 在其最常见的实现中使用“面向块”的处理方式。面向块的处理是指一次读取一个数据并创建在事务边界内写出的“块”。一旦读取的项目数等于提交间隔,整个块被 ItemWriter写出,然后事务被提交。下图显示了该过程:

面向块的处理

伪代码:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read();
    if (item != null) {
        items.add(item);
    }
}
itemWriter.write(items);

也可以使用可选的ItemProcessor配置面向块的step,在传递给 ItemWriter 之前对其进行处理。 下图显示了在step中注册 ItemProcessor 时的过程:

使用项目处理器的面向块的处理

由于我们一次batch的任务可能会有很多的数据读写操作,因此一条一条的处理并向数据库提交的话效率不会很高。

因此 Spring Batch 提供了 chunk 这个概念,我们可以设定一个 chunk size,spring batch 将一条一条处理数据,但不提交到数据库,只有当处理的数据数量达到 chunk size 设定的值得时候,才一起去 commit。

/**
 * Note the JobRepository is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
    			.repository(jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * Note the TransactionManager is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
	return this.stepBuilderFactory.get("sampleStep")
				.transactionManager(transactionManager)
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

上面的配置包括创建item-based的step所需的唯一依赖项:

  • reader:提供要处理的项目的 ItemReader

  • writer:处理 ItemReader 提供的项目的 ItemWriter

  • transactionManager:Spring 的 PlatformTransactionManager,在处理过程中开始并提交事务。

  • repositoryJobRepository 的 Java 特定名称,在处理期间(在提交之前)定期存储 StepExecutionExecutionContext

  • chunk:Java 特定名称,表明这是一个item-based的step,以及在提交事务之前要处理的项目数。

在上面这个 step 里面,chunk size 被设为了 10,当 ItemReader 读的数据数量达到 10 的时候,这一批次的数据就一起被传到 itemWriter,同时 transaction 被提交。

注意:

  • repository 默认为 jobRepositorytransactionManager 默认为 transactionManager(均通过 @EnableBatchProcessing 提供)。

  • ItemProcessor 是可选的,因为可以直接从reader传递给writer。

skip策略和失败处理

在许多情况下,处理过程中遇到的错误不应该导致 Step 失败,而是应该跳过。 这通常是必须由了解数据本身及其含义的人做出决定。

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(Exception.class)
				.noSkip(FileNotFoundException.class)
				.build();
}

我们需要留意这三个方法,分别是 skipLimit()skip()noSkip()

  • skipLimit():可以设定一个我们允许的这个 step 可以跳过的异常数量,假如我们设定为 10,则当这个 step 运行时,只要出现的异常数目不超过 10,整个 step 都不会 fail。

注意,若不设定 skipLimit,则其默认值是 0。

  • skip():可以指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的。

  • noSkip():可以指定不想跳过的异常。

从上面的例子来说,也就是跳过所有除 FileNotFoundException exception

那么对于这个 step 来说,FileNotFoundException 就是一个致命的 exception,抛出这个 exception 的时候 step 就会直接 fail。

异常-重试

并非所有异常都是确定性的。 如果在读取时遇到 FlatFileParseException,则始终为该记录抛出它,重置 ItemReader 没有帮助。

但是,对于 DeadlockLoserDataAccessException这种异常,它表明当前进程已尝试更新另一个进程持有锁定的记录。 等待并再次尝试可能会成功。

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.retryLimit(3)
				.retry(DeadlockLoserDataAccessException.class)
				.build();
}

step通过retryLimit()方法设置允许限制单个项目可以重试的次数为3次,retry()方法设置可重试的异常列表。

回滚

默认情况下,无论重试还是跳过,ItemWriter 抛出的任何异常都会导致 Step 控制的事务回滚。 如果按照前面所述配置跳过,则从 ItemReader 抛出的异常不会导致回滚。

但是,在许多情况下,ItemWriter 抛出的异常不应导致回滚,因为没有采取任何措施使事务无效。 因此,可以使用noRollback()方法为 Step 配置一个不应导致回滚的异常列表。

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.noRollback(ValidationException.class)
				.build();
}

ItemReader 的基本约定是它只是转发,step的buffers reader输入,因此在回滚的情况下,不需要从reader重新读取项目。 但是,在某些情况下,reader构建在事务资源之上,例如 JMS 队列。 在这种情况下,由于队列与回滚的事务相关联,因此从队列中拉出的消息将被放回。 因此,可以使用readerIsTransactionalQueue()方法将该步骤配置为不缓冲项目。

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.readerIsTransactionalQueue()
				.build();
}

使用 Spring Batch 时的值得注意的点

批处理原则

在构建批处理解决方案时,应考虑以下关键原则和注意事项:

  • 批处理体系结构通常会影响体系结构
  • 尽可能简化并避免在单批应用程序中构建复杂的逻辑结构
  • 保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)。
  • 最大限度地减少系统资源的使用,尤其是 I/O。在 internal memory 中执行尽可能多的操作。
  • 查看应用程序 I/O(分析 SQL 语句)以确保避免不必要的物理 I/O。特别是,需要寻找以下四个常见缺陷:当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据;重新读取先前在同一事务中读取数据的事务的数据;导致不必要的表或索引扫描;未在 SQL 语句的 WHERE 子句中指定键值。
  • 在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该(如果可能)在最初处理数据时递增存储的总计,因此您的报告应用程序不必重新处理相同的数据。
  • 在批处理应用程序开始时分配足够的内存,以避免在此过程中进行耗时的重新分配。
  • 总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性。
  • 尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总。
  • 在具有真实数据量的类似生产环境中尽早计划和执行压力测试。
  • 在大批量系统中,数据备份可能具有挑战性,特别是如果系统以 24-7 在线的情况运行。数据库备份通常在在线设计中得到很好的处理,但文件备份应该被视为同样重要。如果系统依赖于文件,则文件备份过程不仅应该到位并记录在案,还应定期进行测试。
默认不启动 job

在使用 java config 使用 Spring Batch 的 job 时,如果不做任何配置,项目在启动时就会默认去跑我们定义好的批处理 job。那么如何让项目在启动时不自动去跑 job 呢?

Spring Batch 的 job 会在项目启动时自动 run,如果我们不想让他在启动时 run 的话,可以在 application.properties 中添加如下属性:

spring.batch.job.enabled=false
读数据时内存不够

在使用 Spring Batch 做数据迁移时,发现在 job 启动后,执行到一定时间点时就卡在一个地方不动了,且 log 也不再打印,等待一段时间之后,得到如下错误:

图片

红字的信息为:

Resource exhaustion event:the JVM was unable to allocate memory from the heap.

意思就是项目发出了一个资源耗尽的事件,告诉我们 java 虚拟机无法再为堆分配内存。

造成这个错误的原因是:这个项目里的 batch job 的 reader 是一次性拿回了数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。

解决的办法有两个:

  • 调整 reader 读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降
  • 增大 service 内存
  • 1
    点赞
  • 10
    收藏
    觉得还不错? 一键收藏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值