本篇文章是 Flink 系列 的第二篇,将会给大家讲述一个 Flink 作业(DataStream 高阶 API 为例的作业)是如何转换为 StreamGraph 的, StreamGraph 可以认为是一个还未经过优化处理的逻辑计划,它完全是在 Client 端生成的。StreamGraph 然后再经过优化转换为 JobGraph,Client 端向 JobManager 提交的作业就是以 JobGraph 的形式提交的,也就是说对于 JobManager 来说,它从客户端接收的作业实际上就是一个 JobGraph,然后它再对 JobGraph 做相应处理,生成具体的物理执行计划进行调度。

关于分布式计算中的 Graph,对于很多人来说,最开始接触和理解这个概念应该还是在 Spark 中。Spark 中有个 DAG (Directed Acyclic Graph,有向无环图)的概念,它包括一些边和一些顶点,其中边代表了 RDD(Spark 中对数据的封装和抽象)、顶点代表了 RDD 上的 Operator,在一个作业中,一旦有 Action 被调用,创建的 DAG 就会被提交到 DAG Scheduler,它会将这个 graph 以 task 的形式调度到不同的节点上去执行计算。Spark 在 MapReduce 的基础上提出了 DAG 的概念,带来了很多的好处,比如:更方便对复杂作业(复杂的 DAG)做全局优化、通过 DAG 恢复丢失的 RDD 等等。Apache Flink 在设计实现中,也借鉴了这个设计,Flink 中的每个作业在调度时都是一个 Graph(Flink 一般叫 DataFlow Graph,Spark 中一般叫作 DAG)。另外,Google 的 Beam 也是类似的概念,Collection 和 Transformation 对数据和操作的最基本抽象,Graph 由 Collection 和 Transformation 构成。

一个 Flink 作业(Steaming 作业),从 Client 端提交到最后真正调度执行,其 Graph 的转换会经过下面三个阶段(第四个阶段是作业真正执行时的状态,都是以 task 的形式在 TM 中运行):

  1. StreamGraph:根据编写的代码生成最初的 Graph,它表示最初的拓扑结构;
  2. JobGraph:这里会对前面生成的 Graph,做一些优化操作(比如: operator chain 等),最后会提交给 JobManager;
  3. ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph,是 Flink 调度时依赖的核心数据结构;
  4. 物理执行图:JobManager 根据生成的 ExecutionGraph 对 Job 进行调度后,在各个 TM 上部署 Task 后形成的一张虚拟图。

这整个转换的内容还是比较多的,也考虑到单篇文章的篇幅问题,这里会先给大家讲述第一部分的转换,也就是 StreamGraph 的转换,同时也会给大家把基本的概念理清楚,便于后面的讲解。

DataSteam API

如果想对后面的内容理解更清楚,首先需要对 DataStream API 的基本概念有一定的理解,Apache Flink 自从 1.0 开始推出 DataStream API 后,经过最近几年的演化,这部分的代码已经变得比较复杂了,有些地方个人感觉还是有些冗余的,这里尽量给大家梳理清楚。

DataStream

A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation.

上面是 DataStream 的定义,从这个叙述中,可以看出,DataStream 实际上就是对相同类型数据流做的封装,它的主要作用就是可以用通过 Transformation 操作将其转换为另一个 DataStream,DataStream 向用户提供非常简单的 API 操作,比如 map()filter()flatMap() 等,目前 Flink 1.9 的代码里提供的 DataStream 实现如下:

DataStream 实现

Transformation

A Transformation represents the operation that creates a DataStream。Transformation 代表创建 DataStream 的一个 operation,这里举一个示例,看一下下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// source 节点,随机产生一行一行的英文语句
DataStream<String> inputStream = env.addSource(new RandomWordCount.RandomStringSource());
// wordCount 里的第一步,将单词拆分
inputStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});

这段代码首先会执行 addSource() 操作,它会创建一个 DataStreamSource 节点, 只有创建了 Source 的 DataStream 节点,后面才能对这个 DataStream 做相应的 Transformation 操作(实际上 DataStreamSource 节点也会有一个对应的 SourceTransformation 对象)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
return addSource(function, "Custom Source");
}

//note: 创建一个 DataStreamSource
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
}
//note: 找到相应的 TypeInformation
if (typeInfo == null) {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}

boolean isParallel = function instanceof ParallelSourceFunction;

clean(function);

//note: 创建一个 Operator
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
//note: 创建 DataStreamSource(这里再创建 DataStreamSource 对象时,会创建一个 SourceTransformation 对象)
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}

接下来再看 flatMap() 方法,这个实现其实跟前面的实现有一些类似之处,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);

return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));

}

public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

//note: 新的 transformation 会连接上当前 DataStream 中的 transformation,从而构建成一棵树
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation, //note: 记录这个 transformation 的输入 transformation
operatorName,
operator,
outTypeInfo,
environment.getParallelism());

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

//note: 所有的 transformation 都会存到 env 中
getExecutionEnvironment().addOperator(resultTransform);

return returnStream;
}

分析到这里,那么 Transformation 到底是什么呢?这里之所以给大家举这个示例,也是为了让大家对 Transformation 有更深入的了解。这里看下下面这一张图,最开始是一个 SourceTransformation,然后又创建一个 OneInputTransformation 对象(这张图就是这里我们举的示例):

Transformation 转换图

实际上,一个 Transformation ,它是对 StreamOperator 的一个封装(而 StreamOperator 又是对 Function 的一个封装,真正的处理逻辑是在 Function 实现的,当然并不一定所有的 Operator 都会有 Function,这里为了便于理解,就按照这个来讲述了),并且会记录它前面的 Transformation,只有这样才能把这个 Job 的完整 graph 构建出来。这里也可以看到,所有对 DataStream 的操作,最终都是以 Transformation 体现的,DataStream 仅仅是暴露给用户的一套操作 API,用于简化数据处理的实现。

StreamOperator

Operator 最基本类的是 StreamOperator,从名字也能看出来,它表示的是对 Stream 的一个 operation,它主要的实现类如下:

StreamOperator 的实现

  • AbstractUdfStreamOperator:会封装一个 Function,真正的操作是在 Function 中的实现,它主要是在最基础的方法实现上也会相应地调用对应 Function 的实现,比如:open/close方法也会调用 Function 的对应实现等;
  • OneInputStreamOperator:如果这个 Operator 只有一个输入,实现这个接口即可, 这个 processElement() 方法需要自己去实现;
  • TwoInputStreamOperator:如果这个 Operator 是一个二元操作符,是对两个流的处理,比如:双流 join,那么实现这个接口即可,用户需要自己去实现 processElement1()processElement2() 方法。

Function

Function 是 Transformation 最底层的封装,用户真正的处理逻辑是在这个里面实现的,包括前面示例中实现的 FlatMapFunction 对象。

Function 的实现

到这里,终于把最基本这些概念介绍完了,只有对这些概念有了相应的理解之后,阅读源码时才不至于被绕进去。

如何生成 StreamGraph

这里在讲述一个作业转换为 StreamGraph 的细节时,依然以上一篇文章中的示例 —— RandomWordCount 来讲述。在执行 env.getStreamGraph().getStreamingPlanAsJSON() 后,这个 StreamGraph 将会以 JSON 的格式输出出来,输出结果如下:

1
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":4,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":8,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"},{"id":2,"ship_strategy":"REBALANCE","side":"second"}]},{"id":6,"type":"Filter","pact":"Operator","contents":"Filter","parallelism":8,"predecessors":[{"id":4,"ship_strategy":"SHUFFLE","side":"second"}]},{"id":8,"type":"Keyed Aggregation","pact":"Operator","contents":"Keyed Aggregation","parallelism":8,"predecessors":[{"id":6,"ship_strategy":"HASH","side":"second"}]},{"id":9,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":2,"predecessors":[{"id":8,"ship_strategy":"REBALANCE","side":"second"}]}]}

Flink Plan Visualizer中可以看到 StreamGraph 可视化之后 graph(用 Chrome 打开可能会显示不全,可以试下 Firefox),如下如所示:

StreamGraph 展示

接下来,详细介绍一下 StreamGraph 是如何转换的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// StreamExecutionEnvironment
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
*
* @return The streamgraph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraphGenerator().generate();
}

private StreamGraphGenerator getStreamGraphGenerator() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
//note: 数据处理操作都在这个 transformations 列表里
return new StreamGraphGenerator(transformations, config, checkpointCfg) //note: ExecutionConfig/CheckpointConfig
.setStateBackend(defaultStateBackend) //note: StateBackend = null
.setChaining(isChainingEnabled) //note: isChainingEnabled = true
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic) //note: TimeCharacteristic = ProcessingTime
.setDefaultBufferTimeout(bufferTimeout); //note: default 100
}

StreamGraph 最后是通过 StreamGraphGenerator 的 generate() 方法生成的,那这个方法到底做了什么事情呢?其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//note: 构建 stream graph
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig);
streamGraph.setStateBackend(stateBackend);
streamGraph.setChaining(chaining);
streamGraph.setScheduleMode(scheduleMode);
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);

alreadyTransformed = new HashMap<>();

//note: 自底向上(先遍历 input transformations)对转换树的每个 transformation 进行转换
for (Transformation<?> transformation: transformations) {
transform(transformation);
}

final StreamGraph builtStreamGraph = streamGraph;

alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;

return builtStreamGraph;
}

最关键的还是 transform() 方法的实现,这里会根据 Transformation 的类型对其做相应的转换,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* Transforms one {@code Transformation}.
* note:对具体的一个 transformation 进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge
* note:返回值为该 transform 的 id 集合,通常大小为1个(除 FeedbackTransformation)
*
* <p>This checks whether we already transformed it and exits early in that case. If not it
* delegates to one of the transformation specific methods.
*/
private Collection<Integer> transform(Transformation<?> transform) {

//note: 已经 Transform 的 Transformation 会放在这个集合中
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}

LOG.debug("Transforming " + transform);

if (transform.getMaxParallelism() <= 0) {

// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
//note: 如果 MaxParallelism 没有设置,使用 job 的 MaxParallelism 设置
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}

// call at least once to trigger exceptions about MissingTypeInfo
//note: 如果是 MissingTypeInfo 类型(类型不确定),将会触发异常
transform.getOutputType();

Collection<Integer> transformedIds;
//note: 根据 transform 的类型,做相应不同的转换
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}

// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}

//note: 将这个 Transform 相关的信息记录到 StreamGraph 中
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
} else {
streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
}

if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}

if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
if (transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled " +
"but no UID or hash has been assigned to operator " + transform.getName());
}
}

if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}

return transformedIds;
}

这里以 transformOneInputTransform() 的实现来举个相应的例子,它会给这个 Transformation 创建相应的 StreamNode,并且创建 StreamEdge 来连接前后的 StreamNode:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

//note: 递归调用,input 的 Transformation 处理完后才能处理后面
Collection<Integer> inputIds = transform(transform.getInput());

// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}

//note: 获取 share group
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

//note: 添加一个 Operator(streamGraph 端会添加一个 StreamNode)
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());

if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}

int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

for (Integer inputId: inputIds) {
//note: 根据输入的 id,给这个 node 在 graph 中设置相应的 graph
streamGraph.addEdge(inputId, transform.getId(), 0);
}

return Collections.singleton(transform.getId());
}

经过上面的 transform() 操作,最后生成的 StreamGraph 样板如下图所示:

StreamGraph

关于上面的 transform() ,还有一个需要注意的是:这三个实现方法 transformSelect()transformPartition()transformSideOutput() 在操作时,并不会创建真正的 StreamNode 节点,它们会创建一个虚拟节点,将相应的配置赋给对应的 StreamEdge 即可。另外对于 transformUnion() 方法,它连虚拟节点也不会创建,原因其实看源码也能明白,它们并不包含具体的处理操作。

到这里,StreamGraph 的创建过程就分析完了,如果理解了 Flink 基本对象的抽象后,再去看这部分代码,实际上并不复杂,这里是对用户的作业逻辑做了一个最简单的转换,并没做什么优化操作,相当于还是原生的用户作业逻辑。


参考