本文是参考网上的博客以及一些书籍根据自己的一些理解整理得到的,主要是为了更好地理解storm的内部机制(当时使用Storm的版本是0.9.3)。

基础

Storm的Topology模型

一个storm Topology的一般模型为:

topology

tuple

storm中传输的数据类型是tuple,tuple到底是什么?感觉还是用英语来说比较容易理解吧,”A tuple is a named of values where each value can be any type.” tuple是一个类似于列表的东西,存储的每个元素叫做field(字段)。我们用getString(i)可以获得tuple的第i个字段。而其中的每个字段都可以任意类型的,也可以一个很长的字符串。我们可以用:

1
2
String A = tuple.getString(0);
long a= tuple.getLong(1);

来拿到我们所需的数据,不过前提你是要知道你的tuple的组成。具体tuple是什么类型,完全取决于自己的程序,取决于spout中nextTuple()方法中emit发送的类型。
storm的streams就是一个无限的tuple流,我们可以把storm tuples当做CEP(complex event processing)中events来理解。

spout

spout是storm topology的数据入口,连接到数据源,将数据转换为一个个tuple,并将tuple作为数据流进行发射。一个spout可以供多个topology使用。通常spouts从外部资源读取元组,然后发射元组到拓扑中(例如,Kestrel队列或Twitter API)。Spouts即可以是可靠的,也可是不可靠的。可靠的spout可以重新执行一个失败元组,但一个不可靠的spout一发射元组就会忘记它。

Spouts可以发射多个流。要发射多个流,使用OutputFieldDeclarer的declareStream方法声明多个流,并在使用SpoutOutputCollector的emit方法时指定流ID。但是由于由spouts和bolts组成的单流应用最为普遍,因此OutputFieldDeclarer提供便利的方法声明一个不需要指定ID的单流,此时,流被分配一个默认ID为“default”。

Spouts的重要方法是nextTuple方法。nextTuple方法发射一个新的元组到拓扑,或如果没有新的元组发射,简单的返回。注意任务spout的nextTuple方法都不要实现成阻塞的,因为storm是在相同的线程中调用spout的方法。

Spout的另外两个重要方法是ack和fail方法。当spout发射的元组被拓扑成功处理时,调用ack方法;当处理失败时,调用fail方法。Ack和fail方法仅被可靠spouts调用。

bolt

bolt可以理解为计算机程序中的运算或函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性地输出一个或者多个数据流。一个bolt可以订阅(subscribe)多个由spout或其他bolt发射的数据流。

Topology中的所有处理都在bolts中完成。Bolts什么都可以做,如过滤、业务功能、聚合、连接(合并)、访问数据库等等。

Bolts可以做简单的流转换。复杂的流转换经常需要多步完成,因此也需要多个bolts。例如,转换tweets数据流到流行图片数据流至少需要两步:一个bolt 对retweets的图片进行滚动计数,另外的bolt找出Top X(前几位)的图片(你可以用更具伸缩性的方式处理这部分流)。

Bolts可以发射多个流。要发射多个流,使用OutputFieldDeclarer的declareStream方法声明多个流,并在使用SpoutOutputCollector的emit方法时指定流ID。

当你声明一个bolt的输入流时,你总是以另一个组件的指定流作为输入。如果你想订阅另一个组件的所有流,你必须分别订阅每一个流。InputDeclarer提供了使用默认流ID订阅流的语法糖,调用declarer.shuffleGrouping(“1”)订阅组件“1”上的默认流,作用等同于declarer.shuffleGrouping(“1”, DEFAULT_STREAM_ID)。

Bolts的主要方法是execute方法,任务在一个新的元组输入时执行该方法。Bolts使用OutputCollector对象发射新的元组。Bolts必须对每个处理的元组调用OutputCollector的ack方法,因此storm知道这个元组完成处理(并且能最终确定ack原始元组是安全的)。一般情况,处理一个输入元组,基于此元组再发射0-N个元组,然后ack输入元组。Strom提供了一个IBasicBolt接口自动调用ack方法。

在Bolts中载入新的线程进行异步处理。OutputCollector是线程安全的,并随时都可调用它。

Streams

Storm核心的抽象概念是“流”。流是一个分布式并行创建和处理的无界的连续元组(tuple)。流通过一种模式来定义,该模式是给流元组中字段命名。默认情况下,元组可以包含整型、长整型、短整型、字节、字符串、双精度浮点数、单精度浮点数、布尔型和字节数组。您还可以自定义序列化,在元组中使用自定义类型。

而消息流Streams是storm里的最关键的抽象。一个消息流是一个没有边界的tuple序列,而这些tuples会被以一种分布式的方式并行地创建和处理。对消息流的定义主要是对消息流里面的tuple的定义, 我们会给tuple里的每个字段一个名字。 并且不同tuple的对应字段的类型必须一样。 也就是说: 两个tuple的第一个字段的类型必须一样, 第二个字段的类型必须一样, 但是第一个字段和第二个字段可以有不同的类型。 在默认的情况下, tuple的字段类型可以是: integer, long, short, byte, string, double, float, boolean和byte array。 你还可以自定义类型 — 只要你实现对应的序列化器。

storm并发机制

在 Storm 的间接中提到过,Storm 计算支持在多台机器上水平扩容,通过将计算切分为多个独立的 tasks 在集群上并发执行来实现。在 Storm 中,一个 task 可以简单地理解为在集群某节点上运行的一个spout 或者 bolt 实例。

  • Nodes: 指配置在一个 Storm 集群中的服务器,会执行 topology 的一部分运算。一个 Storm 集群可以包括一个或者多个工作 node。

  • Workers(JVM虚拟机):指一个 node 上相互独立运行的 JVM 进程。每个 node 可以配置运行一个或者多个 worker。一个 topology 会分配到一个或者多个 worker 上运行。

  • Executeor:指一个 worker 的 jvm 进程中运行的 Java 线程。多个 task 可以指派给同一个 executer 来执行。除非是明确指定,Storm 默认会给每个 executor 分配一个 task。
  • Task:task 是 spout 和 bolt 的 实 例, 它 们 的 nextTuple() 和execute() 方法会被executors 线程调用执行。

默认的并发机制

在我们修改 topology 的并发度之前,先来看默认配置下 topology 是如何执行的。假设
我们有一台服务器(node),为 topology 分配了一个 worker,并且每个 executer 执行一个
task。我们的 topology 执行过程如下图:

Node1

正如在图中看到的,唯一的并发机制出现在线程级。每个任务在同一个 JVM 的不
同线程中执行。如何增加并发度以充分利用硬件能力?让我们来增加分配给 topology 的
worker 和 executer 的数量。

给topology增加worker

增加额外的 worker 是增加 topology 计算能力的简单方法。为此 Storm 提供了 API 和修改配置项两种修改方法。无论采取哪种方法,spout 和 bolt 组件都不需要做变更,可以直接复用。
为了增加分配给一个 topology 的worker 数量,只需要简单的调用一下Config对象的setNumWorkers()方法:

1
2
Config.config = new Config();
config.setNumworkers(2);

这样就给 topology 分配了两个 worker 而不是默认的一个。从而增加了 topology 的计算资源,也更有效的利用了计算资源。我们还可以调整 topology 中的 executor 个数以及每个 executor 分配的 task 数量。

配置executor和task

我们已经知道,Storm 给 topology 中定义的每个组件建立一个 task,默认的情况下,每个 task 分配一个 executor。Storm 的并发机制 API 对此提供了控制方法,允许设定每个task 对应的 executor 个数和每个 executor 可执行的 task 的个数。
在定义数据流分组时,可以设置给一个组件指派的 executor 的数量。为了说明这个功能,修改 topology 的定义代码,设置 SentenceSpout 并发为两个 task,每个 task 指派各自的 executor 线程。

1
builder.setSpout (SENTENCE_SPOUT_ID, spout, 2);

如果只使用一个 worker,topology 的执行如下图所示:

Node2

我们给语句分割 bolt SplitSentenceBolt 设置 4 个 task 和 2 个 executor。每个executor 线程指派 2 个 task 来执行(4/2=2)。还将配置单词计数 bolt 运行四个 task,每个task 由一个 executor 线程执行:

1
2
3
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);

builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_SPOUT_ID, new Fields("word"));

在2个worker的情况下,topology的执行如图所示:

Node3

要重点指出的是,当 topology 执行在本地模式时,增加 worker 的数量不会达到提高速度的效果。因为 topology 在本地模式下是在同一个 JVM 进程中执行的,所以只有增加 task 和 executor 的并发度配置才会生效。Storm 的本地模式提供了接近集群模式的模拟,对开发是否有帮助。但程序在投入生产环境之前,必须在真实的集群环境下进行测试。

数据流分组

Storm 定义了七种内置数据流分组的方式:

  • Shuffle grouping(随即分组):这种方式会随机分发 tuple 给 bolt 的各个 task,每个bolt 实例接收到的相同数量的 tuple;
  • Fields grouping(按字段分组):根据指定字段的值进行分组。比如说,一个数据流根据“ word ”字段进行分组,所有具有相同“ word ”字段值的 tuple 会路由到同一个 bolt 的 task 中;
  • All grouping(全复制分组):将所有的 tuple 复制后分发给所有 bolt task。每个订阅数据流的 task 都会接收到 tuple 的拷贝;
  • Globle Grouping(全局分组):这种分组方式将所有的 tuples 路由到唯一一个 task 上。Storm 按照最小的 task ID 来选取接收数据的 task。注意,当使用全局分组方式时,设置 bolt 的 task 并发度是没有意义的,因为所有 tuple 都转发到同一个 task 上了。使用全局分组的时候需要注意,因为所有的 tuple 都转发到一个 JVM 实例上,可能会引起 Storm 集群中某个 JVM 或者服务器出现性能瓶颈或崩溃;
  • None grouping(不分组):在功能上和随机分组相同,是为将来预留的;
  • Direct gouping(直接分组/指向型分组):数据源会调用 emitDirect() 方法来判断一个 tuple 应该由哪个 Storm 组件来接收。只能在声明了是指向型的数据流上使用;
  • Local or shuffle grouping(本地或随即分组):和随机分组类似,但是,会将 tuple 分发给同一个 worker 内的 bolt task(如果 worker 内有接收数据的 bolt task)。其他情况下,采用随机分组的方式。取决于 topology 的并发度,本地或随机分组可以减少网络传输,从而提高 topology 性能。

除了预定义好的分组方式之外,还可以通过实现 CustomStreamGrouping(自定义分组)
接口来自定义分组方式:

1
2
3
4
5
6
7
8
9
10
11
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.WorkerTopologyContext;
import java.io.Serializable;
import java.util.List;

public interface CustomStreamGrouping extends Serializable {
void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);//prepare()方法在调用时,用来初始化分组信息,分组的具体实现会使用这些信息决定如何向接收task分发tuple。WorkerTopologyContext 对象提供了topology的上下文信息,GlobalStreamId提供了待分组数据流的属性,targetTasks时分组所有待选task的标识符列表。

//会将 targetTasks 的引用存在变量里作为 chooseTasks() 的参数
List<Integer> chooseTasks(int taskId, List<Object> values); //chooseTasks() 方法返回一个 tuple 发送目标 task 的标识符列表。它的两个参数是发送tuple 的组件的 id 和 tuple 的值。
}

可靠的消息处理机制

Storm 提供了一种 API 能够保证 spout 发送出来的每个 tuple 都能够执行完整的处理过程。

在storm里一个tuple被完全处理的意思是:这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个tuple在timeout所指定的时间内没有成功处理。而这个timeout可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来指定。

作为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。 这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。

spout的可靠性

在有保障数据的处理过程中,bolt每收到一个 tuple,都需要向上游确认应答(ack)者报错。对主干 tuple 中的一个 tuple,如果 tuple 树上的每个 bolt 进行了确认应答,spout 会调用 ack 方法来标明这条消息已经完全处理了。如果树中任何一个 bolt 处理 tuple 报错,或者处理超时,spout 会调用 fail方法。
tuple树的结构如图:

tuple_tree

Spout的nextuple()发送一个tuple,为了实现可靠的消息处理,首先要给每个发出的tuple带上唯一的ID,并且将ID作为参数传递给SpoutOutputCollector的emit()方法:

1
collector.emit(new Values("value1","value2"),msgId);

接下来,这个发射的tuple被传送到消息处理者bolt那里,storm会跟踪由此所产生的这课tuple树。如果storm检测到一个tuple被完全处理了,那么storm会以最开始的那个message-id作为参数来调用消息源的ack方法;反之storm会调用spout的fail方法。要注意的是,storm调用ack或者fail的task始终是产生这个tuple的那个task。所以如果一个spout被分成很多个task来执行,消息执行的成功失败与否始终会通知最开始发出tuple的那个task。

给tuple指定ID告诉Storm系统,无论执行成功还是失败,spout都要接收所有发出tuple返回的通知。如果处理成功,spout的ack()方法将会对编号是ID的消息应答确认,如果执行失败或者超时,会调用fail()方法。

bolt的可靠性

bolt要实现可靠的消息处理机制要包含两个步骤:

  1. 当发射衍生的tuple时,需要锚定读入的tuple;
  2. 当处理消息成功或者失败时分别确认应答或者报错。

由一个tuple产生一个新的tuple称为:anchoring(锚定)。你发射一个新tuple的同时也就完成了以西anchring。

锚定一个tuple的意思是,建立读入tuple和衍生出的tuple之间的对应关系,这样下游的bolt就可以通过应答确认,报错或超时来加入到tuple树结构中。
可以通过调用OutputCollect中emit()的一个重载函数锚定一个或者一组tuple:

1
collector.emit(tuple, new Values(word));//anchoring

这里,我们将读入的tuple和发射的新tuple锚定(anchoring)起来,下游的bolt就需要对输出的tuple进行确认应答或者报错。因为这个tuple被anchoring在上一个tuple, 这整个就构成了tuple树,如果这一级tuple处理出错了,那么这整个tuple处理过程都会被重新处理。

另外一个emit()方法会发射非锚定的tuple:

1
collector.emit(new Values(word));// unanchoring

用这种方法发射会导致新发射的这个tuple脱离原来的tuple树(unanchoring), unanchoring的tuple不会对数据流的可靠性起作用。如果一个unanchoring的tuple在下游处理过程中失败了,原始的根tuple是不会重新发送,到底要anchoring还是要 unanchoring则完全取决于你的业务需求。

当处理完成或者发送了新tuple之后,可靠数据流中的bolt需要应答读入的tuple:

1
this.collector.ack(tuple);

如果处理失败,这样的话spout必须发射tuple,bolt就要明确地对处理失败的tuple报错:

1
this.collector.fail(tuple);

如果因为超时的原因,或者显式调用OutputCollector.fail()方法,spout都会重新发送源是的tuple。

每个处理tuple,都必须进行ack或者fail。因为storm会追踪每个tuple要占用内存。所以如果你不ack/fail每一个tuple,那么最终年会看到OutOfMemory错误。

对于SplitSentence这一部分,如果用IRichBolt来做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SplitSentence implements IRichBolt {
OutputCollector _collector;

public void prepare(Map conf,
TopologyContext context,
OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}

public void cleanup() {
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt往往是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。如果用BasicBolt来做, 上面那个SplitSentence可以写成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SplitSentence implements IBasicBolt {
public void prepare(Map conf,
TopologyContext context) {
}

public void execute(Tuple tuple,
BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}

public void cleanup() {
}

public void declareOutputFields(
OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

这个实现比之前的实现简单多了, 但是功能上是一样的。发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。

acker

storm里面有一类特殊的task称为:acker, 他们负责跟踪spout发出的每一个tuple的tuple树。当acker发现一个tuple树已经处理完成了。它会发送一个消息给产生这个tuple的那个task。你可以通过Config.TOPOLOGY_ACKERS来设置一个topology里面的acker的数量, 默认值是一。 如果你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点.

理解storm的可靠性的最好的方法是来看看tuple和tuple树的生命周期, 当一个tuple被创建, 不管是spout还是bolt创建的, 它会被赋予一个64位的id,而acker就是利用这个id去跟踪所有的tuple的。每个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。所以当一个tuple被ack的时候,它会发一个消息给acker,告诉它这个tuple树发生了怎么样的变化。具体来说就是:它告诉acker: 我呢已经完成了, 我有这些儿子tuple, 你跟踪一下他们吧。下面这个图演示了C被ack了之后,这个tuple树所发生的变化。
tuple ack示例:

acker

关于storm怎么跟踪tuple还有一些细节, 前面已经提到过了, 你可以自己设定你的topology里面有多少个acker。而这又给我们带来一个问题, 当一个tuple需要ack的时候,它到底选择哪个acker来发送这个信息呢?

storm使用一致性哈希来把一个spout-tuple-id对应到acker, 因为每一个tuple知道它所有的祖宗的tuple-id, 所以它自然可以算出要通知哪个acker来ack。(这里所有的祖宗是指这个tuple所对应的所有的根tuple。这里注意因为一个tuple可能存在于多个tuple树,所以才有所有一说)。

storm的另一个细节是acker是怎么知道每一个spout tuple应该交给哪个task来处理。当一个spout发射一个新的tuple, 它会简单的发一个消息给一个合适的acker,并且告诉acker它自己的id(taskid), 这样storm就有了taskid-tupleid的对应关系。 当acker发现一个树完成处理了, 它知道给哪个task发送成功的消息.

acker task并不现式的跟踪tuple树。对于那些有成千上万个节点的tuple树,把那么多的tuple信息都跟踪起来会消耗太多的内存。相反,acker永了一种不同的凡是,使得对于每一个spout tuple所需要的内存量都是恒定的(20bytes)。这个跟踪算法是storm如何工作的关键。

Storm集群框架

nimbus守护进程

nimbus守护进程的主要职责是管理,协调和监控在集群上运行的topology。包括topology的发布,任务支派,事件处理失败时重新指派任务。

将topology发布到Storm集群,将预先打包的jar文件的topology和配置信息提交到nimbus服务器上,一旦nimbus接收到了topology的压缩包,会将jar包分发到足够数量的supervisor节点上。当supervisor节点接收到了topology压缩文件,nimbus就会指派task(bolt和spout实例)到每个supervisor并且发送信号指示supervisor生成足够的worker来执行指派的task。

nimbus记录所有supervisor节点的状态和分配给它们的task。如果nimbus发现某个supervisor没有上报心跳或者已经不可达了,它会将故障supervisor分配的task重新分配到集群中的其他supervisor节点。

严格意义上讲 nimbus 不会引起单点故障。这个特性是因为 nimubs 并不参与 topology 的数据处理过程,它仅仅是管理 topology 的初始化,任务分发和进行监控。实际上,如果 nimbus 守护进程在 topology 运行时停止了,只要分配的 supervisor 和worker 健康运行,topology 一直继续数据处理。要注意的是,在 nimbus 已经停止的情况下 supervisor 异常终止,因为没有 nimbus 守护进程来重新指派失败这个终止的 supervisor的任务,数据处理就会失败。

supervisor守护进程

supervisor守护进程等待nimbus分配任务后生成并监控workers(JVM进程)执行任务。supervisor和worker都是运行在不同的 JVM 进程上,如果由 supervisor 拉起的一个woker 进程因为错误(或者因为 Unix 终端的 kill-9 命令,Window 的 tskkill 命令强制结束)异常退出,supervisor 守护进程会尝试重新生成新的 worker 进程。

如果一个worker甚至整个supervisor节点都故障了,Storm怎么保障出错时正在处理的tuples的传输呢?答案就在Storm的tuple的锚定和应答确认机制中。当打开了可靠i传输的选项,传输到故障节点上的tuples将不会收到应答确认,spout会因为超时而重新发射原始的tuple。这样的过程会一直重复直到topology从故障中恢复开始正常处理数据。

Zookeeper的作用

Storm 主要使用 ZooKeeper 来协调一个集群中的状态信息,比如任务的分配情况,worker 的状态,supervisor 之间的 nimbus 的拓扑度量。nimbus 和 supervisor 节点之间的通信主要是结合 ZooKeeper 的状态变更通知和监控通知来处理的。

Storm程序框架

topology提交

本地模式

使用LocalCluster类将topolog运行在本地模式:

1
2
LocalCluster cluster=new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

一般得例子为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LocalRunningTopology extends ExclaimBasicTopo {

public static void main(String[] args) throws Exception {

LocalRunningTopology topo = new LocalRunningTopology();
Config conf = new Config();
conf.setDebug(true);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, topo.buildTopology());
Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
}
}

集群模式

提交一个topology到远程集群就非常简单了,只需要利用StormSubmitter类中同样的方法和名称:

1
StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createrTopology());

一般的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ClusterRunningTopology extends ExclaimBasicTopo {

public static void main(String[] args) throws Exception {

String topoName = "test";

ClusterRunningTopology topo = new ClusterRunningTopology();
Config conf = new Config();
conf.setDebug(true);

conf.setNumWorkers(3);

StormSubmitter.submitTopology(topoName, conf, topo.buildTopology());
}
}

实际的例子

在实际的程序里,本地和集群是混绑在一起的,用传入参数以示区别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws Exception {

ExclaimBasicTopo topo = new ExclaimBasicTopo();
Config conf = new Config();
conf.setDebug(false);

if (args != null && args.length > 0) {
conf.setNumWorkers(3);

StormSubmitter.submitTopology(args[0], conf, topo.buildTopology());
} else {

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, topo.buildTopology());
Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
}
}

Spout

spout

Spout最顶层抽象的是ISPout接口:

ISpout

Bolt

自带的Blot类的关系见下图:

bolt

IBolt