本篇文章是 Flink 系列 的第三篇,紧接着上一篇文章,本文主要讲述 StreamGraph 是如何转换成 JobGraph 的,在前面的文章中,我们知道 StreamGraph 是根据用户作业的处理逻生成初始的逻辑计划,它并没有做任何的优化,而 JobGraph 将会在原来的基础上做相应的优化(主要是算子的 Chain 操作,Chain 在一起的算子将会在同一个 task 上运行,会极大减少 shuffle 的开销)。刚开始接触的同学可能会有一个疑问,为什么要有 StreamGraph 和 JobGraph 两层的 Graph,这里最主要的原因是为兼容 batch process,Streaming process 最初产生的是 StreamGraph,而 batch process 产生的则是 OptimizedPlan,但是它们最后都会转换为 JobGraph,本文主要是以 Streaming 作业的 StreamGraph 转换为 JobGraph 的处理流程来介绍。

生成 JobGraph 的整体流程

这里我们先看下 FlinkPlan 的实现,它主要有两个实现类:StreamGraph 和 OptimizedPlan,分别对应 Streaming 和 Batch process,不管是哪种类型最后可以转换为 JobGraph:

FlinkPlan 的实现

OptimizedPlan 可以通过 JobGraphGenerator 的 compileJobGraph() 方法来转换为 JobGraph,而 StreamGraph 则可以通过 StreamingJobGraphGenerator 的 createJobGraph() 方法来转换为相应的 JobGraph。其中,StreamGraph 的整体转换流程如下图所示(下图主要展示了这个流程涉及到主要方法调用,比较核心的方法图中也加了颜色,也是本文会着重讲述的方法):

生成 JobGraph 的流程

具体实现流程

StreamingJobGraphGenerator 的 createJobGraph() 的方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//note: 根据 StreamGraph 生成 JobGraph
private JobGraph createJobGraph() {

// make sure that all vertices start immediately
//note: 设置调度模式
jobGraph.setScheduleMode(streamGraph.getScheduleMode());

// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
/**
* note: 为每个 SteamNode 生成一个确定的 hash id,如果提交的拓扑没有改变,则每次生成的 hash id 都是一样的
* note: 这里只要保证 source 的顺序是确定的,就可以保证最后生产的 hash id 不变
* note: 它是利用 input 节点的 hash 值及该节点在 map 中位置(实际上是 map.size 算的)来计算确定的
* note: 实现逻辑见 {@link StreamGraphHasherV2#traverseStreamGraphAndGenerateHashes(StreamGraph)}
*/
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

// Generate legacy version hashes for backwards compatibility
//note: 这个设置主要是为了防止 hash 机制变化时出现不兼容的情况
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}

Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();

//note: 最重要的函数,生成 JobVertex/JobEdge 等,并尽可能地将多个节点 chain 在一起
setChaining(hashes, legacyHashes, chainedOperatorHashes);

//note: 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中 (出边集合已经在 setChaining 的时候写入了)
setPhysicalEdges();

//note: 为每个 JobVertex 指定所属的 SlotSharingGroup 以及设置 CoLocationGroup
setSlotSharingAndCoLocation();

//note: checkpoint相关的配置
configureCheckpointing();

//note: 用户的第三方依赖包就是在这里(cacheFile)传给 JobGraph
JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

// set the ExecutionConfig last when it has been finalized
try {
//note: 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}

return jobGraph;
}

核心步骤如下:

  1. 先给每个 StreamNode 生成一个唯一确定的 hash id;
  2. setChaining() 方法将可以 Chain 到一起的 StreamNode Chain 在一起,这里会生成相应的 JobVertex 、JobEdge 、 IntermediateDataSet 对象,JobGraph 的 Graph 在这一步就已经完全构建出来了;
  3. setPhysicalEdges() 方法会将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中 (出边集合已经在 setChaining 的时候写入了);
  4. setSlotSharingAndCoLocation() 方法主要是 JobVertex 的 SlotSharingGroup 和 CoLocationGroup 设置;
  5. configureCheckpointing() 方法主要是 checkpoint 相关的设置。

基本概念

JobGraph 又引入了几个概念,这里先简单介绍一下。

  1. StreamConfig: 它会记录一个 StreamOperator 的配置信息,它保存了这个 StreamOperator 的基本信息,在这里它会将 StreamGraph 中的 StreamNode 的详细信息同步到它对应的 StreamConfig 对象中;
  2. JobVertex: JobVertex 相当于是 JobGraph 的顶点,跟 StreamNode 的区别是,它是 Operator Chain 之后的顶点,会包含多个 StreamNode;
  3. IntermediateDataSet: 它是由一个 Operator(可能是 source,也可能是某个中间算子)产生的一个中间数据集;
  4. JobEdge: 它相当于是 JobGraph 中的边(连接通道),这个边连接的是一个 IntermediateDataSet 跟一个要消费的 JobVertex。

如果跟前面的 StreamGraph 做对比,JobGraph 这里不但会对算子做 Chain 操作,还多抽象了一个概念 —— IntermediateDataSet,IntermediateDataSet 的抽象主要是为了后面 ExecutionGraph 的生成。

算子是如何 Chain 到一起的

这里,我们来介绍一下生成的 JobGraph 过程中最核心一步,算子如何 Chain 到一起,先看一下示例,示例与前面两篇文章的示例是一样的(这里因为图片大小限制,去掉了 filter 算子),StreamGraph 及转换后的 JobGraph 如何下图所示:

Operator Chain 的示例

StreamGraph 转换为 JobGraph 的处理过程主要是在 setChaining() 中完成,先看下这个方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator

/**
* Sets up task chains from the source {@link StreamNode} instances.
* note:从 Source StreamNode 实例开始设置 task chain,它将会递归地创建所有的 JobVertex 实例
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
//note: 处理每个 Source StreamNode
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}

private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

if (!builtVertices.contains(startNodeId)) {

List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

//note: 以 Edge 的粒度,记录上下游算子能 chain 在一起的 Edge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

//note: 当前要处理的 StreamNode
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

//note: 遍历当前的输出节点,判断是否可以 chain 在一起
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) { //note: 如果可以 chain 在一起的话
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}

//note: 递归调用
for (StreamEdge chainable : chainableOutputs) {
//note: 如果可以 chain 在一起的话,这里的 chainIndex 会加 1
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}

for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
//note: 不能 chain 一起的话,这里的 chainIndex 是从 0 开始算的,后面也肯定会走到 createJobVertex 的逻辑
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}

//note: 记录每个 startNodeId 的 hash id(主要是 legacyHashes 中记录的)
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

byte[] primaryHashBytes = hashes.get(currentNodeId);
//note: OperatorID
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}

//note: 记录 chainedName
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
//note: 计算 Chain 之后 node 的 minResources
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
//note: 计算 Chain 之后 node 的资源上限
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

//note: InputFormat & OutputFormat 的处理
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}

if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}

//note: 如果当前节点是 chain 的起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig
//note: 这里实际上,如果节点不能 chain 在一起,那么 currentNodeId 跟 startNodeId 肯定是不相等的
//note: createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)//note: chain 的起始 StreamNode
: new StreamConfig(new Configuration());

//note: 设置 JobVertex 的 StreamConfig, 基本上是将 StreamNode 中的配置设置到 StreamConfig 中
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

if (currentNodeId.equals(startNodeId)) { //note: 如果走到这里,证明这个 chain 已经完成
//note: chain 中起始 StreamNode
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
//note: Config 中也会记录这个 chain 的出边
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

for (StreamEdge edge : transitiveOutEdges) {
//note: 构建 graph
connect(startNodeId, edge);
}

//note: 将 chain 中所有子节点的 StreamConfig 写入到 headOfChain 节点的 CHAINED_TASK_CONFIG 配置中
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

} else {
//note: 如果是 chain 中子节点
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
//note: 将当前 StreamNode 的 config 记录到该 chain 的 config 集合中
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}

config.setOperatorID(currentOperatorId);

if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;

} else {
return new ArrayList<>();
}
}

这段代码处理完成后,整个 JobGraph 就构建完成了,它首先从会遍历这个 StreamGraph 的 source 节点,然后选择从 source 节点开始执行 createChain() 方法,在具体的实现里,主要逻辑如下(需要配合前面的代码去看,这里会把多个 StreamNode Chain 在一起的 Node 叫做 ChainNode,方便讲述):

  1. createChain() 当前要处理的节点是 currentNodeId,先从 StreamGraph 中拿到这个 StreamNode 的 outEdge(currentNode.getOutEdges()),然后判断这个 outEdge 连接的两个 StreamNode 是否可以 Chain 在一起,判断方法是 isChainable()
  2. 紧接着会有一个递归调用:
    • 对于可以 Chain 在一起的 StreamEdge(这个 Edge 连接两个 StreamNode 是可以 Chain 在一起),会再次调用 createChain() 方法,并且 createChain() 中的 startNodeId 还是最开始的 startNodeId(这个标识了这个 ChainNode 的开始 NodeId),而 chainIndex 会自增加 1;
    • 而对于不能 Chain 在一起的 StreamEdge,createChain() 中的 startNodeId 变成了这个 StreamEdge 的 target StreamNode(相当于如果 Chain 在一起,ChainNode 中的 startNodeId 会赋值为下一个节点的 NodeId,然后再依次类推),chainIndex 又从 0 开始计;
    • 也就是说:createChain() 中的 startNodeId 表示了当前可以 Chain 之后 Node 的 startId,这里,会一直递归调用,直到达到 Sink 节点。
  3. 然后在生成 StreamConfig 对象时,判断当前的 currentNodeIdstartNodeId 是否相等,如果相等的话,证明当前 Node 就是这个 ChainNode 的 StartNode,这里会调用 createJobVertex() 方法给这个 ChainNode 创建一个 JobVertex 对象,最后会返回一个 StreamConfig 对象,如果前面的 id 不相等的话,这里会直接返回一个 StreamConfig 对象(这个对象主要是记录当前 StreamNode 的一些配置,它会同步 StreamGraph 中相关的配置);
  4. 最后还会分两种情况判断:
    • 如果 id 相等,相当于这个 ChainNode 已经完成,先做一些相关的配置(比如:标识当前 StreamNode 为这个 JobVertex 的起始 node),最后再通过 connect() 方法创建 JobEdge 和 IntermediateDataSet 对象,把这个 Graph 连接起来;
    • 如果 id 不相等,那么证明当前 StreamNode 只是这个 ChainNode 的一部分,这里只是同步一下信息,并记录到缓存。

上面就是这个方法的主要实现逻辑,下面会详细把这个方法展开,重点介绍其中的一些方法实现。

如何判断算子是否可以 Chain 在一起

两个 StreamNode 是否可以 Chain 到一起,是通过 isChainable() 方法来判断的,这里判断的粒度是 StreamEdge,实际上就是判断 StreamEdge 连接的两个 StreamNode 是否 Chain 在一起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//note: 是否可以 chain 在一起
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); //note: edge 的 source node
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); //note: edge 的 sink node

//note: 获取输入和输出的 Operator Factory
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

return downStreamVertex.getInEdges().size() == 1 //note: 下游 Operator 的 Edge 只有一个(如果是多个合并,是无法 Chain 在一起的)
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //note: 对应的 slotSharingGroup 一样
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //note: out operator 允许 chain 操作
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || //note: head Operator 允许跟后面的 chain 在一起
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner) //note: partitioner 是 ForwardPartitioner 类型
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //note: 并发相等
&& streamGraph.isChainingEnabled(); //note: StreamGraph 允许 Chain 在一起
}

这个方法判断的指标有很多,具体看上面代码就可以明白,这里着重介绍两个:slotSharingGroupedge.getPartitioner()

slotSharingGroup

先看下一个 StreamNode 的 slotSharingGroup 是如何生成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// org.apache.flink.streaming.api.graph.StreamGraphGenerator

public static final String DEFAULT_SLOT_SHARING_GROUP = "default";

/**
* Determines the slot sharing group for an operation based on the slot sharing group set by
* the user and the slot sharing groups of the inputs.
* note: 根据这个 operation 设置的 slot sharing group 和 inputs 的 slot sharing group 来确定其 slot sharing group
* note:1. 如果用户指定了 group name,直接使用这个 name;
* note:2. 如果所有的 input 都是同一个 group name,使用这个即可;
* note:3. 否则使用 default group;
*
* <p>If the user specifies a group name, this is taken as is. If nothing is specified and
* the input operations all have the same group name then this name is taken. Otherwise the
* default group is chosen.
*
* @param specifiedGroup The group specified by the user. note: 用户指定的 group name
* @param inputIds The IDs of the input operations. note: 输入 operation 的 id 集合
*/
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
if (!isSlotSharingEnabled) {
return null;
}

if (specifiedGroup != null) {
return specifiedGroup;
} else {
String inputGroup = null;
for (int id: inputIds) {
String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
if (inputGroup == null) {
inputGroup = inputGroupCandidate;
} else if (!inputGroup.equals(inputGroupCandidate)) {
return DEFAULT_SLOT_SHARING_GROUP;
}
}
return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
}
}

一个 StreamNode 的 SlotSharingGroup 会按照下面这个逻辑来确定:

  1. 如果用户指定了 SlotSharingGroup,直接使用这个 SlotSharingGroup name;
  2. 如果所有的 input 都是同一个 group name,使用这个即可;
  3. 否则使用 default group;
Partitioner

这个 StreamEdge 的属性,在创建 StreamEdge 对象会配置这个属性,先看 Flink 中提供的 Partitioner 有哪几种:

StreamPartitioner 的实现

用户可以在自己的代码中调用 DataStream API (比如:broadcast()shuffle() 等)配置相应的 StreamPartitioner,如果这个没有指定 StreamPartitioner 的话,则会走下面的逻辑创建默认的 StreamPartitioner:

1
2
3
4
5
6
7
//org.apache.flink.streaming.api.graph.StreamGraph
//note: 未指定 partitioner 的话,会为其选择 forward(并发设置相同时) 或 rebalance(并发设置不同时)
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}

创建 JobVertex 节点

JobVertex 对象的创建是在 createJobVertex() 方法中实现的,这个方法实现比较简单,创建相应的 JobVertex 对象,并把相关的配置信息设置到 JobVertex 对象中就完成了,这里就不再展开详细介绍了。

connect() 创建 JobEdge 和 IntermediateDataSet 对象

connect() 方法在执行的时候,它会遍历 transitiveOutEdges 中的 StreamEdge,也就是这个 ChainNode 的 out StreamEdge(这些 StreamEdge 是不能与前面的 ChainNode Chain 在一起)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// org.apache.flink.streaming.api.graph.StreamGraphGenerator
private void connect(Integer headOfChain, StreamEdge edge) {

//note: 记录 StreamEdge,这个主要是 chain 之间的边
physicalEdgesInOrder.add(edge);

Integer downStreamvertexID = edge.getTargetId();

//note: 这里 headVertex 指的是 headOfChain 对应的 JobVertex(也是当前 node 对应的 vertex)
JobVertex headVertex = jobVertices.get(headOfChain);
JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);

StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

//note: 这个节点的输入数增加 1
downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

StreamPartitioner<?> partitioner = edge.getPartitioner();

ResultPartitionType resultPartitionType;
switch (edge.getShuffleMode()) {
case PIPELINED:
resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
break;
case BATCH:
resultPartitionType = ResultPartitionType.BLOCKING;
break;
case UNDEFINED:
resultPartitionType = streamGraph.isBlockingConnectionsBetweenChains() ?
ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED;
break;
default:
throw new UnsupportedOperationException("Data exchange mode " +
edge.getShuffleMode() + " is not supported yet.");
}

//note: 创建 JobEdge(它会连接上下游的 node)
JobEdge jobEdge;
if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
jobEdge = downStreamVertex.connectNewDataSetAsInput( //note: 这个方法会创建 IntermediateDataSet 对象
headVertex,
DistributionPattern.POINTWISE, //note: 上游与下游的消费模式,(每个生产任务的 sub-task 会连接到消费任务的一个或多个 sub-task)
resultPartitionType);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL, //note: 每个生产任务的 sub-task 都会连接到每个消费任务的 sub-task
resultPartitionType);
}
// set strategy name so that web interface can show it.
//note: 设置 partitioner
jobEdge.setShipStrategyName(partitioner.toString());

if (LOG.isDebugEnabled()) {
LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
headOfChain, downStreamvertexID);
}
}

真正创建 JobEdge 和 IntermediateDataSet 对象是在 JobVertex 中的 connectNewDataSetAsInput() 方法中,在这里也会把 JobVertex、JobEdge、IntermediateDataSet 三者连接起来(JobGraph 的 graph 就是这样构建的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//org.apache.flink.runtime.jobgraph.JobVertex
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {

//note: 连接 Vertex 的中间数据集
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);

//note: 创建对应的 edge
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}

到这里,createChain() 方法就执行完了,在 JobGraph 总共会涉及到三个对象:JobVertex、JobEdge 和 IntermediateDataSet,最后生成的 JobGraph 大概下面这个样子:

JobGraph

JobGraph 的其他配置

执行完 setChaining() 方法后,下面还有几步操作:

  1. setPhysicalEdges(): 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中 (出边集合已经在 setChaining 的时候写入了);
  2. setSlotSharingAndCoLocation(): 为每个 JobVertex 指定所属的 SlotSharingGroup 以及设置 CoLocationGroup;
  3. configureCheckpointing(): checkpoint相关的配置;
  4. JobGraphGenerator.addUserArtifactEntries(): 用户依赖的第三方包就是在这里(cacheFile)传给 JobGraph;

这几个方法的实现比较简单,这里简单看下 configureCheckpointing() 这个方法,其他三个就不再叙述了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// org.apache.flink.streaming.api.graph.StreamGraphGenerator
//note: 主要是 checkpoint 相关的配置
private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();

long interval = cfg.getCheckpointInterval();
if (interval < MINIMAL_CHECKPOINT_TIME) {
// interval of max value means disable periodic checkpoint
interval = Long.MAX_VALUE;
}

// --- configure the participating vertices ---

//note: 配置 checkpoint 中要参与的 vertices 节点信息
// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
//note: 记录接收 trigger checkpoint msg 的 vertices,当前都是 source 的情况
List<JobVertexID> triggerVertices = new ArrayList<>();

// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
//note: 记录当前需要向 checkpoint coordinator 发送 ack 的 vertices,当前指的是所有的 vertices
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());

// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
//note: 记录接收 'commit checkpoint' 的 vertices,当前也指的是所有 vertices
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
commitVertices.add(vertex.getID());
ackVertices.add(vertex.getID());
}

// --- configure options ---

CheckpointRetentionPolicy retentionAfterTermination;
if (cfg.isExternalizedCheckpointsEnabled()) {
CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
// Sanity check
if (cleanup == null) {
throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
}
retentionAfterTermination = cleanup.deleteOnCancellation() ?
CheckpointRetentionPolicy.RETAIN_ON_FAILURE :
CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
} else {
//note: 默认是 NEVER_RETAIN_AFTER_TERMINATION,作业只要进入终止 checkpoint 就会删除
retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
}

//note: 默认是 EXACTLY_ONCE
CheckpointingMode mode = cfg.getCheckpointingMode();

boolean isExactlyOnce;
if (mode == CheckpointingMode.EXACTLY_ONCE) {
isExactlyOnce = true;
} else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
isExactlyOnce = false;
} else {
throw new IllegalStateException("Unexpected checkpointing mode. " +
"Did not expect there to be another checkpointing mode besides " +
"exactly-once or at-least-once.");
}

// --- configure the master-side checkpoint hooks ---

final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();

for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory) {
Function f = ((UdfStreamOperatorFactory) node.getOperatorFactory()).getUserFunction();

if (f instanceof WithMasterCheckpointHook) {
//note: 它会在 CheckpointCoordinator 端在每次 checkpoint 及 restore 时触发一个 'global action'
//note: 比如这里可以通过这个接口将状态刷到外部存储
hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
}
}
}

// because the hooks can have user-defined code, they need to be stored as
// eagerly serialized values
//note: 这里对 hooks 做一下序列化
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
if (hooks.isEmpty()) {
serializedHooks = null;
} else {
try {
MasterTriggerRestoreHook.Factory[] asArray =
hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
serializedHooks = new SerializedValue<>(asArray);
}
catch (IOException e) {
throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
}
}

// because the state backend can have user-defined code, it needs to be stored as
// eagerly serialized value
//note: 对 state backend 类做下序列化
final SerializedValue<StateBackend> serializedStateBackend;
if (streamGraph.getStateBackend() == null) {
serializedStateBackend = null;
} else {
try {
serializedStateBackend =
new SerializedValue<StateBackend>(streamGraph.getStateBackend());
}
catch (IOException e) {
throw new FlinkRuntimeException("State backend is not serializable", e);
}
}

// --- done, put it all together ---

//note: 创建一个 JobCheckpointingSettings 对象
JobCheckpointingSettings settings = new JobCheckpointingSettings(
triggerVertices,
ackVertices,
commitVertices,
new CheckpointCoordinatorConfiguration( //note: 创建一个 CheckpointCoordinatorConfiguration 对象
interval,
cfg.getCheckpointTimeout(),
cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
retentionAfterTermination,
isExactlyOnce,
cfg.isPreferCheckpointForRecovery(),
cfg.getTolerableCheckpointFailureNumber()),
serializedStateBackend,
serializedHooks);

jobGraph.setSnapshotSettings(settings);
}

到这里,StreamGraph 转换为 JobGraph 的流程已经梳理完成了,个人感觉这部分还有一些绕的,不过这种开源代码,只要看多几遍,多 debug 看看具体的执行流程,基本都可以搞明白。


参考