Storm的配置文件一般存放在$STORM_HOME/conf下,通常名为storm.yaml,它符合yaml格式要求。Storm的配置参数对任务的稳定运行以及吞吐率至关重要,这里介绍一下storm常见的配置项参数。

storm基本配置

  • storm.local.dir: nimbus 和supervisor进程存储一些状态信息(conf或者jars)的本地路径,需要每台storm node单独创建该目录并保证该目录有正确的读写权限;
  • storm.log4j2.conf.dir: log4j的配置目录;
  • storm.zookeeper.servers: storm严重依赖zookeeper存储状态信息,要保证zookeeper的高可用,最好设置多个zk地址;
  • storm.zookeeper.port: 默认2181;
  • storm.zookeeper.root: 在zookeeper中存储的根目录,如果多个storm集群公用一个zk集群,需要修改其根目录名称即可;
  • storm.session.timeout: 默认20s,nimbus和supervisor和zk的session超时时间,如果log中常用sessiontimeout错误,考虑增加其值或者修改gc参数。该值并不能无限设置,zk有自己的最大session时间(默认20 ticktime);
  • storm.zookeeper.connection.timeout:连接超时时间;
  • storm.zookeeper.retry.times: 默认5,执行zk操作重试次数;
  • storm.zookeeper.retry.interval: 默认1000,即间隔1s;
  • storm.zookeeper.retry.intervalceiling.millis: 300000 (5分钟)执行重试的时间间隔最长时间;
  • storm.messaging.transport:”backtype.storm.messaging.netty.Context” task之间的消息传输协议,默认使用netty传输;
  • storm.cluster.mode: “distributed” storm集群模式;
  • storm.id:运行中拓扑的id,由storm name和一个唯一随机数组成;
  • storm.local.mode.zmq:Local模式下是否使用ZeroMQ作消息系统,如果设置为false则使用java消息系统。默认为false;
    注:Storm严重依赖zookeeper,而且zk在分布式使用中扮演了非常重要的角色。

nimbus相关设置

  • storm.nimbus.retry.times: 5 nimbus操作的重试次数
  • storm.nimbus.retry.interval.millis: 2s 重试间隔
  • storm.nimbus.retry.intervalceiling.millis: 60000 最大重试时间 10分钟
  • nimbus.seeds: [] 用于leader nimbus发现的nimbus hosts 列表,解决nimbus的单点故障问题,代替了原来的nimbus.host 配置
  • nimbus.thrift.port: 6627 nimbus工作的thrift端口,客户端上传jar和提交拓扑的端口(nimbus的thrift监听端口)
  • nimbus.thrift.threads: 64 nimbus thrift 线程数目
  • nimbus.thrift.max_buffer_size: 1048576 1m
  • nimbus.childopts: “-Xmx1024m” nimbus java 进程jvm设置
  • nimbus.task.timeout.secs:30 与task没有心跳时多久nimbus可以认为该task已经死掉并且可以重新分配该task
  • nimbus.supervisor.timeout.secs: 60 一分钟没有心跳 nimbus可以认为该supervisor已经dead,不会分配新的work
  • nimbus.monitor.freq.secs: 10 nimbus多久查询下supervisor心跳信息并且重新分配工作。注意当一台机子曾经挂掉,nimbus会立即采取一些操作
  • nimbus.reassign:当发现task失败时nimbus是否重新分配执行。默认为真,不建议修改。
  • nimbus.cleanup.inbox.freq.secs: 600 多久时间启动清理inbox文件的线程
  • nimbus.inbox.jar.expiration.secs: 3600 一个小时 jar过期时间
  • nimbus.code.sync.freq.secs: 300 5分钟同步一次未执行的拓扑的代码
  • nimbus.task.launch.secs: 120 用于task 第一次启动时的超时时间
  • nimbus.file.copy.expiration.secs: 600 上传下载文件超时时间
  • nimbus.topology.validator: “backtype.storm.nimbus.DefaultTopologyValidator” 拓扑验证,控制该拓扑是否可以执行
  • topology.min.replication.count: 1 当nimbus seeds中该拓扑代码的备份达到最小数目时leader nimbus才可以执行拓扑动作。
  • topology.max.replication.wait.time.sec: 60 当代码备份在nimbus list中达到topology.min.replication.count设置的最大等待时间,如果超时,不管有没有最小备份个数,都要执行该拓扑

supervisor相关配置

  • supervisor.slots.ports: 设置当台机子上可跑的worker数目,每个worker对应一个port,通常情况下多少个cpu core就设置多少个worker,类似与hadoop中nodemanager中slot的设置
  • supervisor.childopts: “-Xmx256m” supervisor jvm参数设置
  • supervisor.worker.start.timeout.secs: 120 supervisor等待worker启动的最长时间
  • supervisor.worker.timeout.secs: 30 worker的最长超时时间
  • supervisor.worker.shutdown.sleep.secs: 1秒 supervisor shutdown worker需要等待的时间
  • supervisor.monitor.frequency.secs: 3s检查一次worker的心跳确保是否要重启这些worker
  • supervisor.heartbeat.frequency.secs: 5s supervisor和nimbus心跳的频率
  • supervisor.enable: true supervisor是否要启动分配它的worker

worker 配置

  • worker.childopts: “-Xmx768m”
  • worker.gc.childopts: “” worker gc set 可以被topology.worker.gc.childopts.覆盖
  • worker.heartbeat.frequency.secs: 1 worker 和supervisor的heartbeat时间
  • topology.worker.receiver.thread.count: 1 每个worker设置的receiver 线程个数
  • task.heartbeat.frequency.secs: 3s task向nimbus发送心跳的频率
  • task.refresh.poll.secs: 10 多久和其他task同步连接(如果task重新分配,发往该task信息的那些task需要重练他们之间的连接)

message传递相关参数

  • storm.messaging.netty.server_worker_threads:1, server端接收信息的线程个数
  • storm.messaging.netty.client_worker_threads: 1, client端发送信息的线程个数
  • storm.messaging.netty.buffer_size: 5M,netty buffer大小
  • storm.messaging.netty.max_retries: 300 重试次数
  • storm.messaging.netty.max_wait_ms: 1000ms=1s 最大等待时间要大于task launchtime and worker launch time默认120s,重连间隔要大于zk的sessiontimeout 以确保worker是否已挂
  • storm.messaging.netty.min_wait_ms: 100
  • storm.messaging.netty.transfer.batch.size: 262144 如果netty 发送消息非常忙,client客户端可以batch发送消息,否则尽快的flush消息以减少延迟。

注:我们的使用场景是storm kafka读取kafka里面的数据,发现运行一段时间以后,kafka消费的offset不再更新(2s更新一次消费的offset),spout 中的task不断重启导致offset一直不更新,查看log发现task失败的原因是gc设置不对以致netty连接超时,task会重新分配。在生产环境中应该调大该值,我们在线上设置该值为20s,并且调整worker的gc参数。

topology相关的设置, 针对特定拓扑的配置

正如上一讲的,以下参数控制消息是否被完全处理:

  • topology.enable.message.timeouts: true 保证数据完全处理;
  • topology.acker.executors: null 设置acker线程个数;
  • topology.message.timeout.secs: 30 当一个消息的处理超时多长时间多少认为该tuple处理失败;
  • topology.max.spout.pending: null 当spout 发送一个tuple时会将该tuple放到一个pending list,此字段控制在storm中处理的spout tuple数,可以根据超时时间以及每秒处理的消息数估算;

下面这几个参数和拓扑的并行度(parallelism),并行度的概念就是为该拓扑启动的线程数,TopologyBuilder#setSpout() 和TopologyBuilder#setbolt()中可以指定excutor数目,该excutor是从worker进程spawn的线程,task是处理数据的实际工作单元,跑在一个excutor上。

  • topology.workers: 1,Config#setNumWorkers,设置worker数,一个worker执行一个拓扑的一个子集任务,其上可以跑多个excutors,可能是多个bolt或者spout;
  • topology.tasks: null,top.setNumtasks,设置task数目;
  • topology.max.task.parallelism: null,拓扑最大线程数;

worker gc设置

  • topology.worker.childopts: null;
  • topology.worker.logwriter.childopts: “-Xmx64m”;
  • topology.worker.shared.thread.pool.size: 4 worker task 共享线程池大小;

worker内消息传送的参数,与disruptor相关:

  • topology.executor.receive.buffer.size: 1024 #batched;
  • topology.executor.send.buffer.size: 1024 #individual messages;
  • topology.transfer.buffer.size: 1024 # batched;
  • topology.disruptor.wait.strategy:com.lmax.disruptor.BlockingWaitStrategy 延迟和吞吐率权衡;
  • topology.disruptor.wait.timeout.millis: 1000 延迟和cpu使用权衡,使用长延时时会减少cpu使用,减少等待时间可以保证延时小,但cpu负载高;

其他参数配置:

  • topology.debug: false debug模式关闭;
  • topology.tick.tuple.freq.secs: null 用于定时处理逻辑的拓扑使用;
  • topology.spout.wait.strategy: “backtype.storm.spout.SleepSpoutWaitStrategy” 两种情形下等待1. no data 2. 达到最大pending大小;
  • topology.sleep.spout.wait.strategy.time.ms: 1 sleep时间;

drpc logview ui的设置就不介绍了。

通过上面所讲的设置参数,可以发现gc设置和zookeeper设置非常重要,而且在message传输的相关设置中,有一系列参数如:receiver buffer size 、transfer buffer size,transfer buffer size,以及netty receiver thread 和worker receiver count 又有什么关系,上面提到到disruptor是什么东东,还有很多问题。所以在设置参数这个主题下还会再详细介绍下面三个问题:

  • jvm 参数设置和调整,这个对于java程序的稳定运行至关重要(尤其是在大数据平台下,因为目前开源的大数据工具大部分都是java写的,jvm调优是必不可少的;
  • zookeeper的维护和管理,在hadoop生态系统以及分布式系统中中必不可少的工具
  • storm worker间和worker内的消息传递,包含disruptor的使用,通过这一节的介绍就会知道buffer size 设置为啥都是2的幂次方,这里buffer size 的单位是字节还是其他等等一些疑惑

参考: