本篇是 Flink StateFun 的第二篇文章,文中的内容是来自 Stateful Functions Internals: Behind the scenes of Stateful Serverless 的翻译,这篇文章从上层把 Flink StateFun 的内核做了一个比较深入的介绍,个人认为它是一篇很不错的、用来了解 StateFun 内部机制的文章。

Stateful Functions (StateFun) 的出现简化了分布式有状态应用的构建,它将有状态流处理(有状态的强一致性保证)与事件驱动的 FAAS 平台(基于云原生架构带来的弹性和 Serverless 体验)结合起来。一个典型的 StateFun 应用包括两个部分:使用现代平台(kubernetes 等)部署 FAAS 服务以及一个 StateFun 集群,StateFun 集群扮演着事件驱动数据库的角色,来为 Functions 的状态和 Event 提供一致性和容错性保证。

那么,StateFun 内部是如何实现的呢?一个 StateFun 集群是如何与这些 Functions 通信呢?本篇文章就带大家深入了解一下 StateFun Runtime 的内部实现原理(文中的示例是完全部署在 AWS 上运行的)。本篇文章的主要目标就是让读者能够比较清楚地理解 StateFun Runtime 与 Functions 之间的交互,以及如何开发一个 Stateful Serverless 应用,并且能够将应用部署到类似于 GCP 或 Microsoft Azure 之类的云平台上。

一个 StateFun 示例: Shopping Cart

这里先来看下一个示例 —— a shopping cart application(购物车应用),下图展示了这个示例中涉及到的两个 Functions、Functions 中维护的 state 以及两个 Function 之间传递的 msg 类型:

 An overly simplified shopping cart application

本文的示例代码见 shopping_cart,这里使用的 Python SDK 开发。

这个应用包含了两个 Function:

  1. Cart Function:该函数每个实例都是与具体的 user 相关联,它的 state 记录了用户购物车中的商品信息(ItemsInCart);
  2. Inventory Function:这个函数主要是用于查询商品的库存信息,它维护了每件商品的库存信息(NumInStock)以及每件商品在所有用户购物车中的数量(NumReserved);

应用中的所有 Msg 都是通过逻辑地址发往相应的 Function 实例,这个逻辑地址会包含 Function Type 及 Intance ID 信息(如:cart:Kiminventory:socks)。本应用中发送到 Ingress 的数据类型是 AddToCart,它表示是一个将相应的商品加到用户的购物车中的操作,发送给 Egress 的类型是 AddToCartResult,它表示的是这个将商品添加到用户购物车中操作的结果(可能会因为库存情况加入失败)。

这几种数据类型定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
syntax = "proto3";

// ---------------------------------------------------------------------
// Shopping cart messages
// ---------------------------------------------------------------------

message AddToCart {
string item_id = 1;
int32 quantity = 2;
}

message AddToCartResult {
enum Type {
SUCCESS = 0;
FAIL = 1;
}

Type type = 1;
string item_id = 2;
int32 quantity = 3;
}

// ---------------------------------------------------------------------
// Shopping cart state type
// ---------------------------------------------------------------------

message ItemsInCart {
map<string, int32> items = 1;
}

Cart Function 是用来处理 AddToCart 类型数据的,它会在应用逻辑中再触发其他的 Function,为了简化这个示例,这里在两个 Function 之间传递的数据只抽象了两种简单的数据类型:

  • RequestItem:从 Cart Function 发送到 Inventory Function 的请求类型(用来查询商品库存);
  • ItemReservedInventory Function 返回的结果(表示可以加到购物车的商品数量)。

Stateful Functions Runtime 是如何工作的?

上面已经详细介绍了购物车应用示例的处理逻辑,这部分注重看一下 StateFun Cluster 是如何保证 Functions 状态及 msg 发送的一致性和容错的。

Simplified view of a StateFun app deployment

StateFun Runtime 是构建在 Apache Flink 之上,并且基于 Flink 的底层机制 —— co-location of state and messaging 来保证一致性和容错性。在一个 StateFun 应用中,所有 messages 的路由转发都是经过 StateFun Cluster 的,包括从 Ingress 中发送的数据、Functions 之间传输的数据以及 Function 发往 Egress 的数据。而且,Function 的 state 都是在 StateFun Cluster 中维护的,如同 Flink 应用一样,StateFun Cluster 中 messages 与 Function State 是 co-partitioned 的,所以计算都是本地 state 访问,而且都是没有任何负作用的原子操作。

这里举个例子,假设一条 target 逻辑地址为 (cart, "Kim") 的 message 经过 StateFun Cluster 路由转发,这个逻辑地址将被用做数据传输和 state 的 partition key(对应的 Flink 作业中就是 keyby 操作中的 key 值),这样的话,StateFun Cluster 接收到的数据都具有本地 state 可用性。与 Flink 相比,StateFun 的区别在于实际的计算逻辑不会发生在 StateFun Cluster Partitions 中,而是由远程 Function Service 来触发。那么 StateFun 是如何做到将 message 路由转发到远程 Function Service、并且提供【如同 state 和计算都在一起的一致性保证的】 state 访问的呢?

Remote Invocation Request-Reply Protocol

StateFun Cluster Partition 与 Function 的交互使用的是一个简洁、定义优雅的 request-reply 协议,如下图所示。一旦 Cluster Partition 接收到相应的 message,就会通过 HTTP 请求根据 target 逻辑地址将其发送到相应的 target Function Service 中。请求的 body 中会包含 input events 和这个 Function 计算需要的状态信息(从本地获取),在 Function 处理完请求后,会将需要返回的结果集合及所有变化的 state 作为 Service Response 都发送回 StateFun Cluster。当 StateFun Cluster Partition 接收到 Response 后,所有的 state 变化都会被写会到本地 State 中,message 会根据 target 逻辑地址路由转发到其他 Cluster Partition 中,触发其他的 Function 调用。

The remote invocation request/reply protocol

在这个框架下,StateFun SDKs 如 Python SDK 以及其他语言的 SDK 都可以基于这个协议来实现;从用户的角度来看,他们部署的 Function 操作的状态都像是本地状态一样,而实际上,这些都是由 StateFun 来维护和保证的,并且通过 HTTP/gRPC 协议来交互。

Function state consistency and fault-tolerance

StateFun Runtime 端会保证在任何时刻,每条 event(如 (cart, "Kim"))只会进行一次触发调用,并且每个实体的触发都是串行进行的(可以理解为一个 StateFun Cluster Partition 上一个 Function 的触发操作都是串行的),如果对于一个实体来说,一个 Function
正在触发,那么新到的数据将会被缓存在 state 中,只有正在进行的触发结束后才能处理后面的请求。另外,因为请求是串行发送,它保证了每个请求都是完全隔离的,并且由于一个请求会将需要的所有信息都放在请求中,所以 Function 的触发是完全幂等的操作(这可以原生地避免 Function 在调用故障时可能会出现的一致性问题)。

关于容错机制,所有由 StateFun Cluster 管理的 Function state 会利用 Flink 原生的分布式快照机制周期性、异步地产生 Checkpoint,并且存储到 HDFS/GCS 这类的远程文件系统。这些 Checkpoint 会包含这个应用所有 Function 的全局一致性状态快照,并且包括 Ingress 中的 offset 信息和 Egress 中正在进行的事务状态信息。如果应用因为某些异常而挂掉,系统会从最新一次成功的 Checkpoint 中恢复,所有 Function 的状态信息都会被恢复、在 Checkpoint 与系统 Crash 之间的 event 也都会按照之前同样的逻辑进行处理,就好像失败从未发生一样。

Step-by-step walkthrough of function invocations

在这一小节,通过上面那个购物车的示例,来看下一条真实的 event 是如何在 StateFun Cluster 与 Function 之间传递的。顾客 Kim 想将 2 双袜子(sock)添加到其购物车中,这条 event 触发的一系列操作如下图所示:

Message flow walkthrough

结合上图,下面一步步来看下这条 event 的处理过程:

  1. 一条 Event AddToCart("Kim", "socks", 2) 从 Ingress Partition 中发送出来 (1),在这个应用中,Ingress event router 配置的 Function Type 是 Cart Function,并且使用 user ID Kim 作为 Instance ID。Function Type 和 Instance ID 它们会确定这个 event 的 target 逻辑地址((cart:Kim));
  2. 这里假设这条 event 是被 StateFun partition B 读取到的,但是 (cart:Kim) 的地址实际上应该路由到 partition A,因此,这条 event 会先被路由到 partition A(2)
  3. StateFun partition A 接收到这条 event 后开始做相应的处理:
    1. 首先,先从本地状态中获取 (cart:Kim) 的状态信息 —— Kim 购物车中已经存在商品列表 (3)
    2. 接着,它会标记 (cart:Kim)busy 的状态,除非当前的 event 处理完,否则不会再处理其他的 event 信息(先将后面的请求其缓存起来),这样可以避免状态一致性的问题;
    3. StateFun Runtime 会通过一个 HTTP Client 向 Cart Function Service 发送请求 (4),这个请求会包含 AddToCart("Kim", "socks", 2) 数据及当前 (cart:Kim) 的状态信息(这里要注意的是,每个请求的路由转发,都会将这个状态信息作为请求的一部分发送到 Function Service 中,这是一个比较有意思的设计);
    4. 远程 Cart Function Service 在接收到数据后,会尝试查询一下库存状态(通过 Inventory Function Service 来查询),因此,它会返回一个 target 逻辑地址为 (inventory:socks)RequestItem("socks", 2) 请求。在这里,经过 Cart Function Service 处理后的任何状态变化都会随着请求返回给 StateFun Cluster 中 (5)
    5. StateFun Runtime 接收到 response 后,再将 RequestItem 信息路由到其他的 Partition 上,并且将 (cart:Kim) 标记为可用状态;
  4. 这里假设 (inventory:socks) 的地址应该路由到 partition B 上,这里,会将对应的 event 再路由转发到 partition B(6)
  5. 一旦 partition B 接收到 RequestItem msg 后,Runtime 将会再次按照上面类似的逻辑进行相应触发 (7)

通过这个示例,我们可以清晰地看到一条 event 在 StateFun Cluster 中的处理流程,对于理解其内部机制很有帮助。

这里比较有意思的点是流程 2 和 6,本质上 StateFun Cluster Partition 代表的是 Flink Job 中具体执行的 Task,Stateful Function 在实现时增加了一个 Feedback Loop 支持,来使得数据流的传输不受限于 DAG 的限制,在 StateFun 中,真实的数据流还可以是有环的,这个将会在下一篇文章中给大家揭秘其内部机制。

总结

Stateful Functions Internals: Behind the scenes of Stateful Serverless 这篇文章的最后是关于在公有云平台部署的介绍,我们就不再详述了,本文通过一个应用示例把 StateFun 内部的实现机制给大家做了一个简单的介绍,比较核心的内容都有所涉及,对于想了解 StateFun 内部原理的同学,本文应该就足以让我们有个清晰的认识。因为之前对 Flink StateFun 做过一些调研,把 StateFun 源码的核心流程简单看了一遍,在下篇文章中将会针对 StateFun 的具体实现做一个梳理,更深入地介绍一下 StateFun 的实现。