本篇文章是 Flink 系列 的第八篇,在介绍 TaskManager 第二部分之前,先来给介绍一下目前 StreamTask 中基于 MailBox 实现的线程模型,这个模型从 1.9 开始实现,在目前发布的 1.10 版本中,基本上已经改造完成,具体 issue 见 FLINK-12477: Change threading-model in StreamTask to a mailbox-based approach,其设计文档见 Change threading-model in StreamTask to a mailbox-based approach,去年,vinoyang 也写了一篇关于它的介绍,见 重磅!Flink 将重构其核心线程模型。因为 Flink 1.10 已经发布,本篇关于 MailBox 实现的介绍会基于 1.10 最新的代码来讲述(系列的其他篇,没有说明的话,默认还是以 1.9 的代码为例),这个功能在 1.9 中还并没有完全完成,所以本文以 1.10 代码为例讲述。

Motivation

先来看下这个改造/改进最初的动机,在之前 Flink 的线程模型中,会有多个潜在的线程去并发访问其内部的状态,比如 event-processing 和 checkpoint triggering,它们都是通过一个全局锁(checkpoint lock)来保证线程安全,这种实现方案带来的问题是:

  1. 锁对象会在多个类中传递,代码的可读性比较差;
  2. 而且锁对象还暴露给了面向用户的 API(见 SourceFunction#getCheckpointLock());
  3. 在使用时,如果没有获取锁,可能会造成很多问题,使得问题难以定位;

基于上面的这些问题,关于线程模型,提出了一个全新的解决方案 —— MailBox 模型,它可以让 StreamTask 中所有状态的改变都会像在单线程中实现得一样简单。方案借鉴了 Actor 模型的 MailbBox 设计理念,它会让这些 action 操作(需要获取 checkpoint lock 的操作)先加入到一个 阻塞队列,然后主线程再从队列取相应的 mail task 去执行。

设计方案

这里先看下,之前的实现方案中,StreamTask 中 checkpoint lock 都主要用在什么地方:

  1. Event-processing: events、watermarks、barriers、latency markers 等的发送和处理;
  2. Checkpoints: 通过 RPC 向 TaskExecutor 发送 Checkpoint trigger 和 completeness 的通知,以及 Checkpoint 的 trigger 和 cancel 在 event 处理期间也可以通过 barrier 接收到;
  3. Processing Time Timers: 目前 SystemProcessingTimeService 是使用 ScheduledExecutor 异步地处理 processing time timer(而 event time timer 依赖于 Watermark 的处理,并且它同步触发的)。

另外,设计方案不但要能达到排它锁的效果,还要对一些核心环节(比如:event processing)能够做到原子性处理。

下面来看下 MailBox 模型 最初设计文档中的设计(方案方案见:Change threading-model in StreamTask to a mailbox-based approach)。

StreamTask 中要做的改变

这里会在 StreamTask 中引入一个 MailBox 变量,最初的一个想法是将 MailBox 设计为一个 ArrayBlockingQueue(实际上在 1.9 的实现中,使用的是一个 ring buffer,1.10 对这部分又做了重构,后面会介绍)。MailBox 将会取代 StreamTask#run() 方法的角色,而且它还可以处理 Checkpoint event 和 processing timer event,这些 event 都会被封装为一个 task 添加到 MailBox 的队列中,而 MailBox 的主线程(单线程)将会消费这个队列中的 task 进行顺序处理。StreamTask 实现的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
BlockingQueue<Runnable> mailbox = ...
void runMailboxProcessing() {
//TODO: can become a cancel-event through mailbox eventually
Runnable letter;
while (isRunning()) {
while ((letter = mailbox.poll()) != null) { letter.run();
letter.run();
}
defaultAction();
}
}
void defaultAction() {
// e.g. event-processing from an input
}

上面的代码实现只是核心代码大概实现,在真正的实现中还可以做很多优化,队列的公平性也是我们考虑的一个点,之前的抢锁操作是完全没有任何公平性而言的。

client 代码需要做的改变

之前的实现中,Checkpoint lock 通过 getter 暴露给相关的 actor(Checkpoint、processing timer、event processing),而在 MailBox 的实现中,将会把 mailbox 隐藏在 queue 接口后面,仅仅向上层暴露 queue 的 getter 接口。

event 的产生与处理

MailBox 的实现将会极大简化代码的实现,MailBox 模型可以确保这些改变都是由单线程来操作,之前很多需要加锁的代码在新的实现中可以被移除。而为了实现MailBox 模型,需要将之前 run() 方法中 event processing 循环调用处理改为一个 event 有界流处理,举个例子:

One/TwoInputStreamTask 中的下面代码

1
while (running && inputProcessor.processInput())

可以修改为

1
inputProcessor.processInput() // 每次触发,都相当于处理一个有限流

在实现中,会先检查 MailBox 有没有 mail(即加入到队列里的 task 任务)需要处理,有的话,就进行处理,如果没有的话,就执行上面的操作,进行 event processing。

这里有一个问题:就是 SourceStreamTask,会有一个兼容性的问题,因为在流的 source 端,它的 event prcessing 是来专门产生一个无限流数据,在这个处理中,并不能穿插 MailBox 中的 mail 检测,也就是说,如果只有一个 MailBox 线程处理的话,当这个线程去产生数据的话,它一直运行下去,就无法再去检测 MailBox 中是否有新的 mail 到来(在 Source 未来的版本中,可以完美兼容 MailBox 线程设计,见 FLIP-27,但现在的版本还不兼容)。

为了兼容 Source 端,目前的解决方案是:两个线程操作,一个专门用产生无限流,另一个是 MailBox 线程(处理 Checkpoint、timer 等),这两个线程为了保证线程安全,还是使用 Checkpoint Lock 做排它锁,如下图所示(图片来自设计文档):

Source StreamTask Mailbox 实现

Checkpoint 和 timer 的 trigger

对于 Checkpoint 和 timer 的 trigger,这里会发现,目前的这个设计是完全可以满足需求的,Checkpoint 和 Timer 的触发事件都会以一个 Runnable 的形式添加到 MailBox 的队列中,等待 MailBox 主线程去处理。

具体实现

介绍完其设计方案,这里注重看下在 Apache Flink 1.10 的代码中,基于 MailBox 模型 的 StreamTask 是如何实现的。

StreamTask 处理流程

在 Flink 中,当一个作业被调度起来后,对于流计算来说,作业中的 Task 最终会以 StreamTask 的形式去执行,在 1.10 的实现中,一个 StreamTask 的核心处理流程如下:

StreamTask MailBox 模型下核心处理流程

StreamTask 中 invoke()runMailboxLoop() 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// org.apache.flink.streaming.runtime.tasks.StreamTask
public final void invoke() throws Exception {
try {
beforeInvoke();

// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}

// let the task do its work
isRunning = true;
runMailboxLoop();

// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}

afterInvoke();
}
finally {
cleanUpInvoke();
}
}

private void runMailboxLoop() throws Exception {
//note: mailbox 处理
try {
mailboxProcessor.runMailboxLoop();
}
catch (Exception e) {
Optional<InterruptedException> interruption = ExceptionUtils.findThrowable(e, InterruptedException.class);
if (interruption.isPresent()) {
if (!canceled) {
Thread.currentThread().interrupt();
throw interruption.get();
}
} else if (canceled) {
LOG.warn("Error while canceling task.", e);
}
else {
throw e;
}
}
}

最后真正执行的是 MailboxProcessor 中的 runMailboxLoop() 方法,也就是上面说的 MailBox 主线程,StreamTask 运行的核心流程也是在这个方法中,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
/**
* Runs the mailbox processing loop. This is where the main work is done.
* note: mailbox 处理核心流程
*/
public void runMailboxLoop() throws Exception {

final TaskMailbox localMailbox = mailbox;

Preconditions.checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");

//note: MailBox 的状态必须是 OPEN,才能继续循环
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

final MailboxController defaultActionContext = new MailboxController(this);

while (processMail(localMailbox)) { //note: 如果有 mail 需要处理,这里会进行相应的处理,处理完才会进行下面的 event processing
//note: 进行 task 的 default action,也就是调用 processInput()
mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
}
}

上面的方法中,最关键的有两个地方:

  1. processMail(): 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false;
  2. runDefaultAction(): 这个最终调用的 StreamTask 的 processInput() 方法,event-processing 的处理就是在这个方法中进行的。

event-processing 处理

对于 StreamTask 来说,event-processing 现在是在 processInput() 方法中实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//org.apache.flink.streaming.runtime.tasks.StreamTask
/**
* This method implements the default action of the task (e.g. processing one event from the input). Implementations
* should (in general) be non-blocking.
* note: 这个方法执行这个 task 默认的 action
*
* @param controller controller object for collaborative interaction between the action and the stream task.
* @throws Exception on any problems in the action.
*/
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
InputStatus status = inputProcessor.processInput(); //note: event 处理
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
//note: 如果输入还有数据,并且 writer 是可用的,这里就直接返回了
return;
}
if (status == InputStatus.END_OF_INPUT) {
//note: 输入已经处理完了,会调用这个方法
controller.allActionsCompleted();
return;
}
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
//note: 告诉 MailBox 先暂停 loop
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
//note: 等待 future 完成后,继续 mailbox loop(等待 input 和 output 可用后,才会继续)
jointFuture.thenRun(suspendedDefaultAction::resume);
}

再结合 MailboxProcessor 中的 runMailboxLoop() 实现一起看,其操作的流程是:

  1. 首先通过 processMail() 方法处理 MailBox 中的 mail
    • 如果没有 mail 要处理,这里直接返回;
    • 先将 MailBox 中当前现存的 mail 全部处理完;
    • 通过 isDefaultActionUnavailable() 做一个状态检查(目的是提供一个接口方便上层控制调用,这里把这个看作一个状态检查方便讲述),如果是 true 的话,会在这里一直处理 mail 事件,不会返回,除非状态改变;
  2. 然后再调用 StreamTask 的 processInput() 方法来处理 event:
    • 先调用 InputProcessor 的 processInput() 方法来处理 event;
    • 如果上面处理结果返回的状态是 MORE_AVAILABLE(表示还有可用的数据等待处理)并且 recordWriter 可用(之前的异步操作已经处理完成),就会立马返回;
    • 如果上面处理结果返回的状态是 END_OF_INPUT,它表示数据处理完成,这里就会告诉 MailBox 数据已经处理完成了;
    • 否则的话,这里会等待,直到有可用的数据到来及 recordWriter 可用。

checkpoint trigger 处理

接着来看下 Checkpoint Trigger 是怎么处理的,要先看下 Streamtask 的 triggerCheckpointAsync() 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
@Override
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {

//note: checkpoint 触发时,提交相应的 task
return mailboxProcessor.getMainMailboxExecutor().submit(
() -> triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
}

这里可以看到,其实现跟方案设计中的是一致,Checkpoint trigger 这里的操作就是向 MailBox 提交一个 Task,等待 MailBox 去处理。

SourceStreamTask 如何兼容

在设计文档中,有个重要的、特别要注意的点就是 SourceStreamTask 的兼容问题,开始的设计方案是在 SourceStreamTask 中专门启动两个线程来保持兼容性问题,而且虽然使用了 MailBox 模型,但还是会继续使用 checkpoint lock 来保证线程安全,这里看下其是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//org.apache.flink.streaming.runtime.tasks.SourceStreamTask
@Override
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {

//note: 告诉 MailBox 先暂停 loop
controller.suspendDefaultAction();

// Against the usual contract of this method, this implementation is not step-wise but blocking instead for
// compatibility reasons with the current source interface (source functions run as a loop, not in steps).
sourceThread.setTaskDescription(getName());
sourceThread.start();
sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {
if (sourceThreadThrowable == null || isFinished) {
//note: sourceThread 完成后,没有抛出异常或 task 完成的情况下
mailboxProcessor.allActionsCompleted();
} else {
//note: 没有完成但结束了或者抛出异常的情况下
mailboxProcessor.reportThrowable(sourceThreadThrowable);
}
});
}


/**
* Runnable that executes the the source function in the head operator.
* note: source 产生 data 的一个线程
*/
private class LegacySourceFunctionThread extends Thread {

private final CompletableFuture<Void> completionFuture;

LegacySourceFunctionThread() {
this.completionFuture = new CompletableFuture<>();
}

@Override
public void run() {
try {
//note: 调用 source Operator 的 run
headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);
completionFuture.complete(null);
} catch (Throwable t) {
// Note, t can be also an InterruptedException
completionFuture.completeExceptionally(t);
}
}

public void setTaskDescription(final String taskDescription) {
setName("Legacy Source Thread - " + taskDescription);
}

CompletableFuture<Void> getCompletionFuture() {
return completionFuture;
}
}

可以看到:

  1. LegacySourceFunctionThread 线程在启动时,会先通知一下 MailBox,这个就是上面说的那个状态检查,收到这个信号之后,MailBox 就会在 processMail() 中一直等待并且处理 mail,不会返回(也就是 MailBox 主线程一直在处理 mail 事件);
  2. LegacySourceFunctionThread 线程就是专门生产数据的,跟 MailBox 这两个线程都在运行。

那么两个线程如何保证线程安全呢?如果仔细看上面的代码就会发现,在 SourceStreamTask 中还继续使用了 getCheckpointLock(),虽然这个方法现在已经被标注了将要被废弃,但 Source 没有改造完成之前,Source 的实现还是会继续依赖 checkpoint lock。

总结

这里,总结一下 Flink 1.10 中 MailBox 模型的核心设计,如下图所示:

MailBox 模型核心设计

  1. MailboxExecutor: 它负责向 MailBox 提交 task 任务;
  2. TaskMailbox: 负责存储相应 task 任务(也就是 mail),它支持多写单读,单线程读取并处理;
  3. MailboxProcessor: MailBox 的核心处理线程,MailboxDefaultAction 是其默认的 action 实现,可以理解为 StreamTask 的 event 处理逻辑就是基于 MailboxDefaultAction 接口实现的。

Flink MailBox 这块的设计还是非常不错的,无论是从代码的可读性上还是后续维护性上都是要比之前的设计好很多,也值得我们学习借鉴。


参考: