Storm是一个分布式实时大数据处理系统,它是一个流数据框架,具有最高的摄取率,虽然Storm是无状态的,它通过ZooKeeper管理分布式环境和集群状态,保证每个消息将通过拓扑至少处理一次
关键字:实时、流数据
storm类似图片的电梯,一直往上传送数据,数据一上去就被传送、处理
| tuple | 元组,数据结构,有序的元素列表,通常是任意类型的数据,outputCollector.emit(new Values(s1));,这里的new Value(s1)就是一个tuple |
|---|---|
| Stream | 流,一序列的tuple,拓扑图中的线 |
| Spouts | 流的源,storm从原始数据源接收输入数据,可以编写以获取数据源读取数据,实现IRichSpout接口,继承baseRichSpout |
| Bolts | Bolts是逻辑处理单元,Spout将数据传递到Bolts的过程以及Bolts之间,并产生新的输出流,一些常见的接口:IRichBolt、IBasicBolt等 |
| 拓扑 | Spouts和Bolts连接在一起,形成拓扑结构,简单来说拓扑是有向图,其中顶点是计算,边缘时数据流Spouts将数据发射到一个或者多个Bolts,bolt表示拓扑中最小的逻辑节点,Bolts的输入可以发射到另一个Bolts作为输入 |
| 进程 | 拓扑在多个工作节点上以分布式方式运行,Storm将所有的工作节点上的任务均匀分布 |
| 任务 | spout和bolt的执行的过程就是任务 |
| 流分组 | 控制tuple如何进行路由,数据流从Spouts流到Bolts,或从一个Bolts流到另一个Bolts,内置4个分组策略 |
Nimbus:master node,主要工作运行拓扑,分析元组tuple,收集执行的task,将task分给supervisor
supervisor:工作节点,有多个处理进程,代理任务给所有的wokr进程,在Nimbus和supervisor之间使用内部的消息系统通信
Nimbus:master node ,在work node间分发数据,指派task给work node,监控故障
supervisor:接收nimbus的指令,有多个work进程,监视work进程,完成task
work process:执行相关的task,本身不执行,创建executor(执行线程),可以有多个执行线程
executor:执行线程
task:处理数据
zookeeper:维持状态
随机分组,随机派发stream里面的tuple,保证每个bolt task 接收到tuple数目一样大,平均分配,轮询
一个spout,有两个Bolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);// 一个spout
builder.setBolt("bolt",new MyBolt(),2).shuffleGrouping("spout");
输出的结果:
default Thread-9-bolt--id=83 line :1 session_id:123 default Thread-11-bolt--id=85 line :1 session_id:dfghj default Thread-9-bolt--id=83 line :2 session_id:34567 default Thread-11-bolt--id=85 line :2 session_id:2345 default Thread-9-bolt--id=83 line :3 session_id:234fv default Thread-9-bolt--id=83 line :4 session_id:456yg default Thread-9-bolt--id=83 line :5 session_id:23456ygh default Thread-11-bolt--id=85 line :3 session_id:werfg default Thread-11-bolt--id=85 line :4 session_id:345 default Thread-11-bolt--id=85 line :5 session_id:23456ygh default Thread-11-bolt--id=85 line :6 session_id:1 default Thread-9-bolt--id=83 line :6 session_id:456yg default Thread-9-bolt--id=83 line :7 session_id:2 default Thread-11-bolt--id=85 line :7 session_id:2
其中Thread-9-bolt的id为83,另一个是85,两个随机分配,但是保证了平均分配
2)字段分组 (Fields Grouping) 一般用在计数按照字段分组,比如,按照"session_id"这个字段来分组,那么具有相同的值的"session_id"的tuple被分配到相同的Bolt里的一个task,而不同的"session_id",不同的"session_id"的可能被分配到不同的task.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);// 一个spout
builder.setBolt("bolt",new MyBolt(),2).fieldsGrouping("spout",new Fi
输出结果:
11 2 3 default Thread-11-bolt--id=85 line :6 session_id:2 1 2 3 default Thread-11-bolt--id=85 line :7 session_id:2 1 2 3 default Thread-11-bolt--id=85 line :8 session_id:2 1 2 3 default Thread-11-bolt--id=85 line :9 session_id:2 1 2 3 default Thread-11-bolt--id=85 line :10 session_id:2 11 2 33 default Thread-11-bolt--id=85 line :11 session_id:2
数据格式中的第二个是session_id,session_id值相同的交给同一个 Thread-11-bolt–id
3)所有分组 (All Grouping)常用来发送信号广播发送,对于每一个tuple,所有的bolt都会收到
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);// 一个spout
builder.setBolt("bolt",new MyBolt(),2).allGrouping("spout");
输出结果:
ssssssssss 123 sdfv default Thread-9-bolt--id=83 line :1 session_id:123 ssssssssss 123 sdfv default Thread-11-bolt--id=85 line :1 session_id:123 sdfgh dfghj sdfghj default Thread-9-bolt--id=83 line :2 session_id:dfghj sdfgh dfghj sdfghj default Thread-11-bolt--id=85 line :2 session_id:dfghj
每个tuple都会被每个bolt处理一边
4)全局分组 (Global Grouping)一般用来汇总全局分组,把tuple分配给task_id 最低的task.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",new MySpout(),1);// 一个spout
builder.setBolt("bolt",new MyBolt(),3).globalGrouping("spout");
输出结果:
12 1 1 default Thread-9-bolt--id=82 line :12 session_id:1 11 2 3 default Thread-9-bolt--id=82 line :13 session_id:2 1 2 3 default Thread-9-bolt--id=82 line :14 session_id:2 1 2 3 default Thread-9-bolt--id=82 line :15 session_id:2 1 2 3 default Thread-9-bolt--id=82 line :16 session_id:2 1 2 3 default Thread-9-bolt--id=82 line :17 session_id:2 11 2 33 default Thread-9-bolt--id=82 line :18 session_id:2
每次都是最小的id执行task
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("wordSpout",new WordSpout());
// SplitBolt单词分割器设置4个Task,2个Executeor(线程)
builder.setBolt("splitBolt",new SplitBolt(),2).shuffleGrouping("wordSpout");
//CountBolt单词计数器设置4个Executeor(线程)
builder.setBolt("countBolt",new CountBolt(),4).fieldsGrouping("splitBolt",new Fields("word"));
// 最后countBolt把发送的tuple 汇总到唯一的ReporteBolt
builder.setBolt("reportBolt",new ReportBolt()).globalGrouping("countBolt");
最后4个countBolt被汇总到countBolt
其他的就做演示



