Storm之配置文件
Storm的配置文件一般存放在$STORM_HOME/conf
下,通常名为storm.yaml
,它符合yaml格式要求。Storm的配置参数对任务的稳定运行以及吞吐率至关重要,这里介绍一下storm常见的配置项参数。
storm基本配置
- storm.local.dir: nimbus 和supervisor进程存储一些状态信息(conf或者jars)的本地路径,需要每台storm node单独创建该目录并保证该目录有正确的读写权限;
- storm.log4j2.conf.dir: log4j的配置目录;
- storm.zookeeper.servers: storm严重依赖zookeeper存储状态信息,要保证zookeeper的高可用,最好设置多个zk地址;
- storm.zookeeper.port: 默认2181;
- storm.zookeeper.root: 在zookeeper中存储的根目录,如果多个storm集群公用一个zk集群,需要修改其根目录名称即可;
- storm.session.timeout: 默认20s,nimbus和supervisor和zk的session超时时间,如果log中常用sessiontimeout错误,考虑增加其值或者修改gc参数。该值并不能无限设置,zk有自己的最大session时间(默认20 ticktime);
- storm.zookeeper.connection.timeout:连接超时时间;
- storm.zookeeper.retry.times: 默认5,执行zk操作重试次数;
- storm.zookeeper.retry.interval: 默认1000,即间隔1s;
- storm.zookeeper.retry.intervalceiling.millis: 300000 (5分钟)执行重试的时间间隔最长时间;
- storm.messaging.transport:”backtype.storm.messaging.netty.Context” task之间的消息传输协议,默认使用netty传输;
- storm.cluster.mode: “distributed” storm集群模式;
- storm.id:运行中拓扑的id,由storm name和一个唯一随机数组成;
- storm.local.mode.zmq:Local模式下是否使用ZeroMQ作消息系统,如果设置为false则使用java消息系统。默认为false;
注:Storm严重依赖zookeeper,而且zk在分布式使用中扮演了非常重要的角色。
nimbus相关设置
- storm.nimbus.retry.times: 5 nimbus操作的重试次数
- storm.nimbus.retry.interval.millis: 2s 重试间隔
- storm.nimbus.retry.intervalceiling.millis: 60000 最大重试时间 10分钟
- nimbus.seeds: [] 用于leader nimbus发现的nimbus hosts 列表,解决nimbus的单点故障问题,代替了原来的nimbus.host 配置
- nimbus.thrift.port: 6627 nimbus工作的thrift端口,客户端上传jar和提交拓扑的端口(nimbus的thrift监听端口)
- nimbus.thrift.threads: 64 nimbus thrift 线程数目
- nimbus.thrift.max_buffer_size: 1048576 1m
- nimbus.childopts: “-Xmx1024m” nimbus java 进程jvm设置
- nimbus.task.timeout.secs:30 与task没有心跳时多久nimbus可以认为该task已经死掉并且可以重新分配该task
- nimbus.supervisor.timeout.secs: 60 一分钟没有心跳 nimbus可以认为该supervisor已经dead,不会分配新的work
- nimbus.monitor.freq.secs: 10 nimbus多久查询下supervisor心跳信息并且重新分配工作。注意当一台机子曾经挂掉,nimbus会立即采取一些操作
- nimbus.reassign:当发现task失败时nimbus是否重新分配执行。默认为真,不建议修改。
- nimbus.cleanup.inbox.freq.secs: 600 多久时间启动清理inbox文件的线程
- nimbus.inbox.jar.expiration.secs: 3600 一个小时 jar过期时间
- nimbus.code.sync.freq.secs: 300 5分钟同步一次未执行的拓扑的代码
- nimbus.task.launch.secs: 120 用于task 第一次启动时的超时时间
- nimbus.file.copy.expiration.secs: 600 上传下载文件超时时间
- nimbus.topology.validator: “backtype.storm.nimbus.DefaultTopologyValidator” 拓扑验证,控制该拓扑是否可以执行
- topology.min.replication.count: 1 当nimbus seeds中该拓扑代码的备份达到最小数目时leader nimbus才可以执行拓扑动作。
- topology.max.replication.wait.time.sec: 60 当代码备份在nimbus list中达到topology.min.replication.count设置的最大等待时间,如果超时,不管有没有最小备份个数,都要执行该拓扑
supervisor相关配置
- supervisor.slots.ports: 设置当台机子上可跑的worker数目,每个worker对应一个port,通常情况下多少个cpu core就设置多少个worker,类似与hadoop中nodemanager中slot的设置
- supervisor.childopts: “-Xmx256m” supervisor jvm参数设置
- supervisor.worker.start.timeout.secs: 120 supervisor等待worker启动的最长时间
- supervisor.worker.timeout.secs: 30 worker的最长超时时间
- supervisor.worker.shutdown.sleep.secs: 1秒 supervisor shutdown worker需要等待的时间
- supervisor.monitor.frequency.secs: 3s检查一次worker的心跳确保是否要重启这些worker
- supervisor.heartbeat.frequency.secs: 5s supervisor和nimbus心跳的频率
- supervisor.enable: true supervisor是否要启动分配它的worker
worker 配置
- worker.childopts: “-Xmx768m”
- worker.gc.childopts: “” worker gc set 可以被topology.worker.gc.childopts.覆盖
- worker.heartbeat.frequency.secs: 1 worker 和supervisor的heartbeat时间
- topology.worker.receiver.thread.count: 1 每个worker设置的receiver 线程个数
- task.heartbeat.frequency.secs: 3s task向nimbus发送心跳的频率
- task.refresh.poll.secs: 10 多久和其他task同步连接(如果task重新分配,发往该task信息的那些task需要重练他们之间的连接)
message传递相关参数
- storm.messaging.netty.server_worker_threads:1, server端接收信息的线程个数
- storm.messaging.netty.client_worker_threads: 1, client端发送信息的线程个数
- storm.messaging.netty.buffer_size: 5M,netty buffer大小
- storm.messaging.netty.max_retries: 300 重试次数
- storm.messaging.netty.max_wait_ms: 1000ms=1s 最大等待时间要大于task launchtime and worker launch time默认120s,重连间隔要大于zk的sessiontimeout 以确保worker是否已挂
- storm.messaging.netty.min_wait_ms: 100
- storm.messaging.netty.transfer.batch.size: 262144 如果netty 发送消息非常忙,client客户端可以batch发送消息,否则尽快的flush消息以减少延迟。
注:我们的使用场景是storm kafka读取kafka里面的数据,发现运行一段时间以后,kafka消费的offset不再更新(2s更新一次消费的offset),spout 中的task不断重启导致offset一直不更新,查看log发现task失败的原因是gc设置不对以致netty连接超时,task会重新分配。在生产环境中应该调大该值,我们在线上设置该值为20s,并且调整worker的gc参数。
topology相关的设置, 针对特定拓扑的配置
正如上一讲的,以下参数控制消息是否被完全处理:
- topology.enable.message.timeouts: true 保证数据完全处理;
- topology.acker.executors: null 设置acker线程个数;
- topology.message.timeout.secs: 30 当一个消息的处理超时多长时间多少认为该tuple处理失败;
- topology.max.spout.pending: null 当spout 发送一个tuple时会将该tuple放到一个pending list,此字段控制在storm中处理的spout tuple数,可以根据超时时间以及每秒处理的消息数估算;
下面这几个参数和拓扑的并行度(parallelism),并行度的概念就是为该拓扑启动的线程数,TopologyBuilder#setSpout() 和TopologyBuilder#setbolt()中可以指定excutor数目,该excutor是从worker进程spawn的线程,task是处理数据的实际工作单元,跑在一个excutor上。
- topology.workers: 1,Config#setNumWorkers,设置worker数,一个worker执行一个拓扑的一个子集任务,其上可以跑多个excutors,可能是多个bolt或者spout;
- topology.tasks: null,top.setNumtasks,设置task数目;
- topology.max.task.parallelism: null,拓扑最大线程数;
worker gc设置
- topology.worker.childopts: null;
- topology.worker.logwriter.childopts: “-Xmx64m”;
- topology.worker.shared.thread.pool.size: 4 worker task 共享线程池大小;
worker内消息传送的参数,与disruptor相关:
- topology.executor.receive.buffer.size: 1024 #batched;
- topology.executor.send.buffer.size: 1024 #individual messages;
- topology.transfer.buffer.size: 1024 # batched;
- topology.disruptor.wait.strategy:com.lmax.disruptor.BlockingWaitStrategy 延迟和吞吐率权衡;
- topology.disruptor.wait.timeout.millis: 1000 延迟和cpu使用权衡,使用长延时时会减少cpu使用,减少等待时间可以保证延时小,但cpu负载高;
其他参数配置:
- topology.debug: false debug模式关闭;
- topology.tick.tuple.freq.secs: null 用于定时处理逻辑的拓扑使用;
- topology.spout.wait.strategy: “backtype.storm.spout.SleepSpoutWaitStrategy” 两种情形下等待1. no data 2. 达到最大pending大小;
- topology.sleep.spout.wait.strategy.time.ms: 1 sleep时间;
drpc logview ui的设置就不介绍了。
通过上面所讲的设置参数,可以发现gc设置和zookeeper设置非常重要,而且在message传输的相关设置中,有一系列参数如:receiver buffer size 、transfer buffer size,transfer buffer size,以及netty receiver thread 和worker receiver count 又有什么关系,上面提到到disruptor是什么东东,还有很多问题。所以在设置参数这个主题下还会再详细介绍下面三个问题:
- jvm 参数设置和调整,这个对于java程序的稳定运行至关重要(尤其是在大数据平台下,因为目前开源的大数据工具大部分都是java写的,jvm调优是必不可少的;
- zookeeper的维护和管理,在hadoop生态系统以及分布式系统中中必不可少的工具
- storm worker间和worker内的消息传递,包含disruptor的使用,通过这一节的介绍就会知道buffer size 设置为啥都是2的幂次方,这里buffer size 的单位是字节还是其他等等一些疑惑
参考:
公众号
个人公众号(柳年思水)已经上线,最新文章会同步在公众号发布,欢迎大家关注~