Storm知识点
1. 离线计算是什么?
离线计算:批量获取数据、批量传输数据、 周期性批量计算数据、数据展示 代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、Azkaban任务调度。2. 流式计算是什么?
流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示 代表技术:Flume实时获取数据、Kafka/metaq实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存、持久化存储(mysql)。3. storm核心组件和架构?
- 核心组件
- 架构

4. 并发度
用户指定的一个任务,可以被多个线程执行,并发度的数量等于线程的数量。一个任务的多个线程,会被运行在多个Worker(JVM)上。5. Storm运行模式
本地模式:在当前IDE运行; 集群模式:在集群上运行。6. Storm常见的分组策略
共有7种,这里记录常见的几种: shuffleGrouping-随机分组:Random函数; Non Grouping-不分组:Random函数; FieldGrouping-字段分组:Hash取模,保证同一类型数据数据流向同一个worker上的task; Local or ShuffleGrouping-本地随机分组:本地或随机,优先本地。7. Storm操作命令
- 任务提交命令:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
1 storm jar /export/servers/storm/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology wordcount
与hadoop不同的是:不需要指定输入输出路径,因为在代码中已经记录了
1 hadoop jar /usr/local/wordcount.jar /data.txt /wcout
- 杀死任务命令:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
1 storm kill topology-name -w 10
- 停用任务命令:storm deactive 【拓扑名称】
1 storm deactive topology-name
我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有
已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。
- 启用任务命令:storm activate 【拓扑名称】
1 storm activate topology-name
- 重新部署任务命令:storm rebalance 【拓扑名称】
1 storm rebalance topology-name
再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配worker,并重启拓扑。
8. Storm Ack机制 为了保证数据能正确的被处理, 对spout产生的每一个tuple, storm都会进行跟踪。即
ACK机制,storm启动acker task参与工作流程:
- Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪;
- Bolt在处理Tuple成功或失败后,也会发一个消息通知acker;
- acker会找到发射该Tuple的Spout,回调其ack或fail方法。
1 //BaseBasicBolt 实现可靠消息传递 2 public class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack 3 public void execute(Tuple tuple, BasicOutputCollector collector) { 4 String sentence = tuple.getString(0); 5 for(String word: sentence.split(" ")) { 6 collector.emit(new Values(word)); 7 } 8 } 9 public void declareOutputFields(OutputFieldsDeclarer declarer) { 10 declarer.declare(new Fields("word")); 11 } 12 }在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);
1 //BaseRichBolt 实现可靠消息传递 2 public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子 3 OutputCollector _collector; 4 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 5 _collector = collector; 6 } 7 public void execute(Tuple tuple) { 8 String sentence = tuple.getString(0); 9 for(String word: sentence.split(" ")) { 10 _collector.emit(tuple, new Values(word)); // 建立 anchor 树 11 } 12 _collector.ack(tuple); //手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple); 13 } 14 public void declareOutputFields(OutputFieldsDeclarer declarer) { 15 declarer.declare(new Fields("word")); 16 } 17 }
9. 调整可靠性
有时候需要调整Storm的可靠性。例如,不关心数据是否丢失,或者想看看后面是否有某个Bolt拖慢Spout速度。有三种方法可以实现:- 在build topology时,设置acker数目为0,即conf.setNumAckers(0);
- 在Spout中,不指定messageId,使得Storm无法追踪;
- 在Bolt中,使用Unanchor方式发射新的Tuple。

更多精彩