每个tuple可以包含多列,字段类型可以是: integer, long, short, byte, string, double, float, boolean和byte array。 Spout 数据源(Spout)是拓扑中数据流的来源。 一般 Spout 会从一个外部的数据源读取元组然后将他们发送到拓扑中。 根据需求的不同,Spout 既可以定义为可靠的数据源,也可以定义为不可靠的数据源。 一个可靠的 Spout能够在它发送的元组处理失败时重新发送该元组,以确保所有的元组都能得到正 确的处理; storm在检测到一个tuple被整个topology成功处理的时候调用ack, 否则调用fail。 不可靠的 Spout 就不会在元组发送之后对元组进行任何其他的处理。一个 Spout可以发送多个数据流。 Bolt 拓扑中所有的数据处理均是由 Bolt 完成的。 通过数据过滤(filtering)、函数处理(functions)、聚合(aggregations)、联结(joins)、数 据库交互等功能 一个 Bolt 可以实现简单的数据流转换,而更复杂的数据流变换通常需要使用多个 Bolt 并通过多个 步骤完成。 第一级Bolt的输出可以作为下一级Bolt的输入。而Spout不能有上一级。 Bolt 几乎能够完成任何一种数据处理需求。 Bolts的主要方法是execute(死循环)连续处理传入的tuple, 成功处理完每一个tuple调用OutputCollector的ack方法,以通知storm这个tuple被处理完成 了。 处理失败时,可以调fail方法通知Spout端可以重新发送该tuple。 StreamGroup 为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。 数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。在 Storm 中有八种内置的数 据流分组方式。 Reliablity 可靠性 Storm 可以通过拓扑来确保每个发送的元组都能得到正确处理。 通过跟踪由 Spout 发出的每个元组构成的元组树可以确定元组是否已经完成处理。 每个拓扑都有一个“消息延时”参数,如果 Storm 在延时时间内没有检测到元组是否处理完成,就会 将该元组标记为处理失败,并会在稍后重新发送该元组。 记录级容错Storm的DRPC DRPC (Distributed RPC) 分布式远程过程调用 DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。 DRPC Server 负责接收 RPC 请求 并将该请求发送到 Storm中运行的 Topology 等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。 DRPC设计目的: 为了充分利用Storm的计算能力实现高密度的并行实时计算。 Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。 客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。 实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。 DRPC 服务器会为每个函数调用都标记了一个唯一的 id。 随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器 根据函数调用的 id 来将函数调用的结果返回。 Storm的容错机制 集群节点宕机 Nimbus宕机 单点故障 从1.0.0版本以后,Storm的Nimbus是高可用的。 非Nimbus节点 故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行 进程故障 Worker 每个Worker中包含数个Bolt(或Spout)任务。 Supervisor负责监控这些任务,当worker失败后会尝试在本机重启它 如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上 Supervisor 无状态(所有的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常情况,都会自动毁灭) 快速失败(fail-fast) 在用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改 (增加、删除、修改) 则会抛出Concurrent Modification Exception java.util包下的集合类都是快速失败的,不能在多线程下发生并发修改 安全失败(fail-safe) 采用安全失败机制的集合容器,在遍历时不是直接在集合内容上访问的 而是先复制原有集合内容,在拷贝的集合上进行遍历 java.util.concurrent包下的容器都是安全失败,可以在多线程下并发使用,并发修改。 Nimbus 无状态(所有的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常情况,都会自动毁灭) 任务级容错 Bolt任务crash引起的消息未被应答。 此时,acker中所有与此Bolt任务关联的消息都会因为超时而失败,对应的Spout的fail方法将被调用。 acker任务失败。 如果acker任务本身失败了,它在失败之前持有的所有消息都将超时而失败。Spout的fail方法将被调用。 Spout任务失败 在这种情况下,与Spout任务对接的外部设备(如MQ)负责消息的完整性。 消息的完整性 消息的完整性定义 每个从Spout(Storm中数据源点)发出的Tuple(Storm中最小的消息单元)可能会生成成 千上万个新的Tuple 形成一颗Tuple树,当整颗Tuple树的节点都被成功处理了,我们就说从Spout发出的Tuple被 完全处理了。 消息完整性机制--Acker acker的任务就是追踪从spout中流出来的每一个message id绑定的若干tuple的处理路径, 如果在用户设置的最大超时时间内这些tuple没有被完全处理,那么acker就会告知spout该消 息处理失败了 相反则会告知spout该消息处理成功了。 XOR异或 异或的运算法则为:0异或0=0,1异或0=1,0异或1=1,1异或1=0(同为0,异为1) A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次 验证方式: spout或者bolt在处理完tuple后,都会告诉acker我已经处理完了该源tuple(如 tupleId=1),如果emit一个tuple的话,同时会告诉acker我发射了一个tuple(如 tupleId=2),如果在大量的高并发的消息的情况下,传统的在内存中跟踪执行情况的方 式,内存的开销会非常大,甚至内存溢出 acker巧妙的利用了xor的机制,只需要维护一个msgId的标记位即可,处理方法是acker 在初始的时候,对每个msgId初始化一个校验值ack-val(为0),在处理完tuple和emit tuple的时候,会先对这两个个值做xor操作,生成的中间值再和acker中的当前校验值 ack-val做xor生成新的ack-val值,当所有的tuple都处理完成都得到确认,那么最后的 ack-val自然就为0了 Storm的通信机制 Worker进程间通信原理 worker进程间消息传递机制,消息的接收和处理的流程如下图 worker进程 为了管理流入和传出的消息,每个worker进程都有一个独立的接收线程和发送线程 接收线程来负责将外部发送过来的消息移动到对应的executor线程的incoming-queue中 发送线程负责从worker的transfer-queue中读取消息,并通过网络发送给其他worker executor线程 每个executor有独立的incoming-queue 和outgoing-queue Worker接收线程将收到的消息通过task编号传递给对应的executor的incoming-queues executor有单独的线程分别来处理spout/bolt的业务逻辑,业务逻辑输出的中间数据会存放在 outgoing-queue 当executor的outgoing-queue中的tuple达到一定的阀值,executor的发送线程将批量获取 outgoing-queue中的tuple,并发送到transfer-queue中 每个worker进程控制一个或多个executor线程,用户可在代码中进行配置。其实就是我们在代码 中设置的并发度个数。 通信技术 netty:Netty是一个NIO client-server(客户端服务器)框架 Worker进程内通信原理 Disruptor是一个Queue。 Disruptor是实现了“队列”的功能,而且是一个有界队列(长度有限)。而队列的应用场景自然就 是“生产者-消费者”模型 Disruptor一种线程之间信息无锁的交换方式 (使用CAS(Compare And Swap/Set)操作) Disruptor主要特点 1、 没有竞争=没有锁=非常快。 2、 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结 构。 Disruptor 核心技术点 Disruptor可以看成一个事件监听或消息机制,在队列中一边生产者放入消息,另外一边消费 者并行取出处理. 底层是单个数据结构:一个ring buffer(环形数据缓冲区) Storm的数据分发策略 ShuffleGrouping 随机分组,随机派发stream里面的tuple,保证每个bolttask接收到的tuple数目大致相同。轮询,平均分配 优点: 为tuple选择task的代价小; bolt的tasks之间的负载比较均衡; 缺点: 上下游components之间的逻辑组织关系不明显 FieldsGrouping 按字段分组 比如,按"user-id"这个字段来分组,那么具有同样"user-id"的tuple会被分到相同的Bolt里的一个 task,而不同的"user-id"则可能会被分配到不同的task。 优点: 上下游components之间的逻辑组织关系显著; 缺点: 付出为tuple选择task的代价; bolt的tasks之间的负载可能不均衡,根据field字段而定 AllGrouping 广播发送,对于每一个tuple,所有的bolts都会收到 优点: 上游事件可以通知下游bolt中所有task; 缺点: tuple消息冗余,对性能有损耗,请谨慎使用; GlobalGrouping 全局分组,把tuple分配给taskid最低的task 优点: 所有上游消息全部汇总,便于合并、统计等; 缺点: bolt的tasks之间的负载可能不均衡,id最小的task负载过重; DirectGrouping 指向型分组,这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消 息接收者的哪个task处理这个消息。 只有被声明为DirectStream的消息流可以声明这种分组方法。 而且这种消息tuple必须使用emitDirect方法来发射。 消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法 也会返回task的id) 优点: Topology的可控性强,且组件的各task的负载可控; 缺点: 当实际负载与预估不符时性能削弱; Localorshufflegrouping 本地或随机分组。如果目标bolt有一个或者个task源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的ShuffleGrouping行为一致 优点: 相对于ShuffleGrouping,因优先选择同进程task间传输而降低tuple网络传输代价, 但因寻找同进程的task而消耗CPU和内存资源,因此应视情况来确定选择 ShuffleGrouping或LocalOrShuffleGrouping; 缺点: 上下游components之间的逻辑组织关系不明显; NoneGrouping 不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shufflegrouping是一 样的效果。有一点不同的是storm会把使用nonegrouping的这个bolt放到这个bolt的订阅者同一个 线程里面去执行(未来Storm如果可能的话会这样设计)。 customGrouping 自定义,相当于mapreduce那里自己去实现一个partition一样。



