Stateful Function(简称 StateFun)从 2019 正式对外宣布之后,今年 4 月份已经发了 2.0 版(并且是作为 Apache Flink 项目中的一部分发布),7 月份也发布了 2.1.0 版。在 2.0 的架构中 Function 已经从 JVM 中解耦出来,只需要通过 HTTP/gRPC 来调用即可,新的架构可以充分利用 FAAS 的能力。本篇文章就来简单看下 StateFun 的架构及应用示例,后面还会有陆续有两篇文章来深入剖析一些其内部实现。

StateFun 的架构

StateFun 2.0 的架构与 1.0 做了非常大的变化,Function 部分已经完全与 JVM 部分解耦,Function 部分可以单独进行部署,直接部署在 FAAS 上或者直接使用 Kubernetes 启动相应的 HTTP/RPC 服务都是可以的,如下图所示:

Flink StateFun 2.0 Architecture

Flink TaskManagers 从 Ingress 系统(如:kafka、kinesis 等)中接收数据,并且将它们发送给对应的 StateFul Functions 中,经过 Function 计算完后,再发送回 TM,TM 再根据 target address 信息将其发送给其他的 Function 或 Egress 系统(如:kafka、kinesis 等)。

这里先看下 StateFun 框架的几个概念:

  1. Ingress:StateFun 的事件输入源,它可以是 queue、logs 或者 HTTP servers,当前 StateFun 内部已经支持的是 Kafka 和 Kinesis,类似于 Flink Streaming Job 中的 Source Operator;
  2. Egress:StateFun 的事件输出源,与 Ingress 类似,在一个 StateFun Application 中,Egress 并不是必需的、是可选的,当前内部支持的是 Kafka 和 Kinesis,类似于 Flink Streaming Job 中 Sink Operator;
  3. Stateful Functions:它就类似于 FAAS 中的 Function,是应用真正做计算的地方,在 StateFun 中流转的每一条 Event,都需要指定其 target address 来表明它需要发向哪个 Function 或 Egress。

在这套架构中,Flink Cluster 主要是做 state 一致性保证及 event 路由转发的功能,FAAS 专注于其计算(无需 care 状态存储及一致性的问题)。实际上,在这套系统下,Flink 相当于去掉了传统数据库的角色,因为 Flink 更适合用于 event 驱动的函数和服务,通过集成状态存储,保证了函数或服务间传递消息的有状态性。

Event-driven Database vs. Request/Response Database

在传统的数据库或者 Key/Value 存储(这里称之为 Request/Response Database)中,应用需主动发送一个查询到数据库(如 SQL via JDBC、GET/PUT via HTTP)。然而,在 StateFun 这类事件驱动数据库中,这个关系被反转了:数据库根据到达的消息来调用函数或服务。这个特性非常适合 FaaS 或者事件驱动架构的应用。

Stateful Functions 2.0 inverts the relationship between database and application

基于请求/响应数据库的应用中,数据库只负责保存状态。函数或服务间的通讯通常一个独立的服务层进行处理。相反,事件驱动数据库以紧密集成的方式既保存了状态的存储,又承担了消息的传输。

另外 StateFun 的架构还有两个优势:

  1. 借助 Flink 的 Checkpoint 来实现 Exactly once,而如果使用数据库的话这些都需要业务自己来做,业务比较难做到整个链路的 Exactly once;
  2. 数据库一般都会有从库,在向数据库发送一个读请求时,有可能读取的不是最新的数据,而在 StateFun 中,数据只会存储在一个 StateFun Cluster Partition 中(后续文章会介绍),就不会有这个问题。

StateFun 核心组件

StateFun 2.0 中,一个 StateFun 应用所涉及的核心组件如下图所示:

Flink StateFun 2.0 Architecture Components

在上图中,也可以看到 Flink TaskManagers 中它的主要作用就是接收消息、管理状态、将 event 转发到不同的 Function 以及将数据通过 Egress 发送出去。

在这里,要说明的是,Function 之间并不是直接交流的,数据路由发送都有是由 TM 来操作,TM 将一条 Event 发送给一个 Function,它处理后,会将结果及 target adress 发送回 TM,再由 TM 根据 target address 发送到下游。这些 Function 所使用到的持久化状态都是在 TM 中维护,本身依赖了 Flink 的 StateBackend 及 Checkpoint 机制。

上图中的 Function Dispatcher 表示的是 Function 的部署方式,图中使用的是 Remote Function。

StateFun 三种部署方式

在前面 StateFun 所涉及的核心组件图中,Function Dispatcher 在调用函数时,函数是可以有多种部署选择的。

Remote Functions

2.0 架构中,一个比较大的 Feature 就是支持了 Remote Function,它完全与底层 Flink 集群解耦,通过 HTTP/gRPC 与 Flink TaskManager 进行交互,如下图所示:

Remote Functions

简单来说,Remote Functions 的意思就是函数是独立部署的,从物理上和 Flink Cluster 是分开的。Flink Task Managers 和函数之间的沟通是通过 HTTP/gRPC 请求来完成的。

Co-located Functions

其架构如下图所示:

Co-located Functions

这种部署方式就是将函数和 TaskManager 的进程部署在一个实例(Pod 或者机器)上,用不同的容器或者进程隔离开来,例如 K8S 中的 sidecar 这种模式。TaskManager 就可以和函数直接在本地通信,但也失去了 FAAS 独立扩缩的能力。

Embedded Functions

Embedded Functions

这种部署模式更加直接,函数和 TaskManagers 直接在同一个容器内,像 Stateful Functions 1.0 就是这种模式,用高的耦合度换取了高的性能,但损失了灵活性和扩展性,它本质上就完全类似于一个 Flink Streaming Job。

StateFun 示例

在介绍 StateFun 示例之前,还有两个概念,需要简单看下,那就是 RouterModule,它 StateFun API 中比较核心的抽象(针对 Java SDK 而言,Python SDK 抽象得更简单)。

Router

Router 的含义,这里可以从两个方面来理解:

  1. 从 StateFun 的角度,它为 Ingress 指定了其要发送的 Function;
  2. 从 Flink 的角度,它有两个作用:一是指定下游的 FunctionType(要发送的 Function),二是指定的了其 keyBy shuffle 时使用的 key(StateFun 的状态深度使用了 Flink 中 keyby 操作,这里会在后面的文章详细介绍)。

举一个 Java 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final class AddToCartRouter implements Router<ProtobufMessages.AddToCart> {
@Override
public void route(
ProtobufMessages.AddToCart message, Downstream<ProtobufMessages.AddToCart> downstream) {
downstream.forward(Identifiers.CART, message.getUserId(), message);
}
}

// forward 的方法说明
/**
* Forwards the message as an input to a downstream function, addressed by a specified {@link
* FunctionType} and the functions unique id within its type.
*
* @param functionType the target function's type.
* @param id the target function's unique id.
* @param message the message being forwarded.
*/
default void forward(FunctionType functionType, String id, T message) {
forward(new Address(functionType, id), message);
}

这里的 Address 就是前面说的 target address,它唯一表示了一个 Function,表示要发送的 Function,由两部分组成:FunctionType 指明了具体的 Function,id 表示在 Flink keyby shuffle 时的 key 值。而如果这里要发送的是 Egress 的话,直接使用 EgressIdentifier 来区分而不需要再设置 id

在上面的示例中,这个 Router 就指明了 Ingress 数据要发送的下游 Function 信息。

Module

在 StateFun 中,Module 是一个用于添加核心模块的一个入口,它把 Ingress、Egress、Routers 及 Stateful Function bind 在一起。一个简单 Java 示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@AutoService(StatefulFunctionModule.class)
public final class GreetingModule implements StatefulFunctionModule {

private static final String KAFKA_KEY = "kafka-address";

private static final String DEFAULT_KAFKA_ADDRESS = "kafka-broker:9092";

@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {

// pull the configured kafka broker address, or default if none was passed.
String kafkaAddress = globalConfiguration.getOrDefault(KAFKA_KEY, DEFAULT_KAFKA_ADDRESS);
GreetingIO ioModule = new GreetingIO(kafkaAddress);

// bind an ingress to the system along with the router
binder.bindIngress(ioModule.getIngressSpec());
binder.bindIngressRouter(GreetingIO.GREETING_INGRESS_ID, new GreetRouter());

// bind an egress to the system
binder.bindEgress(ioModule.getEgressSpec());

// bind a function provider to a function type
// note: provider 可以决定这个 function 交互方式,可以使 HTTP 或 GRPC 的形式
binder.bindFunctionProvider(GreetStatefulFunction.TYPE, unused -> new GreetStatefulFunction());
}
}

对于一个 Module 实现,首先需要实现 StatefulFunctionModule 相关的接口,并且用 @AutoService(StatefulFunctionModule.class) 来修饰,这里使用了 Java SPI 的技术(不展开讨论),在 configure() 方法中,将这个 StateFun 应用的需要绑定的组件定义出来,组件的顺序是没有要求的(与 DataStream API 不同),内部在解析时是通过 Target Address 来确定下游的。

在一个 StateFun 应用中可以有多个 Module,用于绑定不同的组件,可以方便团队协同开发(举个例子:一个 Module 绑定一个组件模块,由不同的同学开发不同的组件模块),不过在一个 StateFun 中,只会有一个 Binder,也就是说,多个 module 最终都会被一个 Binder 连接起来。

Java SDK 示例

在官方仓库中有一个 Java 的示例 —— The Greeter Example,这个示例比较简单,从 kafka 中接收 event 数据(这里可以认为是 user name),在 Function 中会记录每个 event(user)出现的次数,根据出现的次数返回相应的结果,最后将结果写出到一个 Kafka Topic 中,先来看下其 Module 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@AutoService(StatefulFunctionModule.class)
public final class GreetingModule implements StatefulFunctionModule {

// kafka 集群的配置
private static final String KAFKA_KEY = "kafka-address";

// kafka 集群的配置
private static final String DEFAULT_KAFKA_ADDRESS = "kafka-broker:9092";

@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {

// IO 模块的初始化,这里初始化了 Ingree 和 Egress 部分
// pull the configured kafka broker address, or default if none was passed.
String kafkaAddress = globalConfiguration.getOrDefault(KAFKA_KEY, DEFAULT_KAFKA_ADDRESS);
GreetingIO ioModule = new GreetingIO(kafkaAddress);

// 绑定 Ingress 模块,并设置相应的 Router,为 Ingress 数据源指定下游 Function 信息
// bind an ingress to the system along with the router
binder.bindIngress(ioModule.getIngressSpec());
binder.bindIngressRouter(GreetingIO.GREETING_INGRESS_ID, new GreetRouter());

// 绑定一个 Egress
// bind an egress to the system
binder.bindEgress(ioModule.getEgressSpec());

// 绑定相应的 Function,并指明这个 Function 的交互方式
// bind a function provider to a function type
binder.bindFunctionProvider(GreetStatefulFunction.TYPE, unused -> new GreetStatefulFunction());
}
}

StateFun 中比较核心的地方是 State 的使用,下面来看下这个示例中 Function 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
final class GreetStatefulFunction implements StatefulFunction {

/**
* The function type is the unique identifier that identifies this type of function. The type, in
* conjunction with an identifier, is how routers and other functions can use to reference a
* particular instance of a greeter function.
*
* <p>If this was a multi-module application, the function type could be in different package so
* functions in other modules could message the greeter without a direct dependency on this class.
*/
// 定义这个 Function 的 FunctionType
static final FunctionType TYPE = new FunctionType("apache", "greeter");

/**
* The persisted value for maintaining state about a particular user. The value returned by this
* field is always scoped to the current user. seenCount is the number of times the user has been
* greeted.
*/
// 声明持久化状态信息
@Persisted
private final PersistedValue<Integer> seenCount = PersistedValue.of("seen-count", Integer.class);

@Override
public void invoke(Context context, Object input) {
// Function 真正的处理逻辑:得到处理之后的 response 后,再为其指定 target address,这里的 target address 就是 Egress 的信息
GreetRequest greetMessage = (GreetRequest) input;
GreetResponse response = computePersonalizedGreeting(greetMessage);
context.send(GreetingIO.GREETING_EGRESS_ID, response);
}

private GreetResponse computePersonalizedGreeting(GreetRequest greetMessage) {
final String name = greetMessage.getWho();
// 获取当前的状态
final int seen = seenCount.getOrDefault(0);
// 更新相应的状态
seenCount.set(seen + 1);

String greeting = greetText(name, seen);

return GreetResponse.newBuilder().setWho(name).setGreeting(greeting).build();
}

private static String greetText(String name, int seen) {
switch (seen) {
case 0:
return String.format("Hello %s ! \uD83D\uDE0E", name);
case 1:
return String.format("Hello again %s ! \uD83E\uDD17", name);
case 2:
return String.format("Third time is a charm! %s! \uD83E\uDD73", name);
case 3:
return String.format("Happy to see you once again %s ! \uD83D\uDE32", name);
default:
return String.format("Hello at the %d-th time %s \uD83D\uDE4C", seen + 1, name);
}
}
}

StateFun API 是非常简洁的,在使用 State 时,只需要通过 Persisted 注解修饰即可,否则不会保存到 Flink State 中,也就不会进行容错,在底层的实现上,它通过反射来找到一个 Function 中声明的变量信息,并将其注册到 Flink State 中,如果不通过注解修饰,就无法获取这个 State 变量。

总结

StateFun 2.0 发布之后,其生产性可用提高很多,它已经可以完全与 JVM 解耦,并且可以很好地利用 FAAS 的扩展能力,但是底层的 state 及数据转发依然受限于 Flink Job 的限制,无法完全做到自动伸缩,在大规模数据量的场景下,其可用性及可靠性有待验证,不过 StateFun 现在还在发展中,未来也不是没有机会。


参考

  1. Github Flink-StateFun
  2. Stateful Functions Documentation
  3. Stateful Functions 2.1.0 Release Announcement
  4. Stateful Functions 2.0 - An Event-driven Database on Apache Flink
  5. Stateful Functions Internals: Behind the scenes of Stateful Serverless