NIFI是可以部署成集群的,在多台机器上分布式部署提高数据吞吐能力。本文第五章,通过源码,来梳理NIFI的分布式如何实现。
在上一节 NIFI的 WAL 日志部分有提到,NIFI的数据,分为两部分,一个是在界面上可操作的配置信息部分,比如添加的处理器,新增的处理器组,两个处理器之间新增的连接,处理器的启停,另外一个就是flowFile部分,比如Web接口接到XML之后,拆分成了很多条。所以这部分也分为两部分来说。
一、界面配置同步还是以处理器的启动为例,在本系列的第一篇文章中,就已经讲到了启动处理器的入口:
在NIFI的源文件中按照上述路径,就能找到,除去前部处理异常的代码,剩下的如下:
第546行有一个判断,点进去可以看到:
它是用来检查当前的请求是否应该被复制到集群中,当前的NIFI实例不属于集群中的节点或者没有与集群建立连接的时候,返回false. 前一个是通过配置判断,后一个则是查看有没有集群协调器或者有没有与集群协调器进行通信。然后是检查请求头中的的 X-Request-Replicated 是否设置.
在最开始启动某个处理器的时候,上述方法返回true,才能实现将节点的启动信息,通过集群告知其他节点一起启动。也就是说,对NIFI界面上配置的修改,应该是在操作的时候,同步到集群其他节点的。我们继续跟进到547行的方法,会来到这里:
红框这里,是判断是把请求只给到集群的协调器,还是直接给到每一个节点。
如果当前节点,已经被选做是集群的协调器, ReplicationTarget 返回 CLUSTER_NODES,代码走边,否则走下边。简单理解就是说,如果当前节点已经是协调器了,就可以直接把请求,给下边的参与节点了,否则需要把请求发给协调器,协调器再给到各个节点。
我们先跟到1102行进去,会来到这里,注意看类名:
这里173的判断除了抛出异常,没有做实质性的工作,跳过去,来到最后:
获取到所有处于连接状态的节点,
加锁,防止不同的请求在同一时间,修改流,同时注意280行,在请求中加了个属性REPLICATION_INDICATOR_HEADER。还记得前边的判断吧。通过这个字段,NIFI节点就知道,当前的请求,是来自于集群协调器的,不必在进行转发。
最后来到:
将给节点发送请求打成任务丢入线程池,完成对所有节点的请求。
回到刚刚的分支,当需要给协调器发消息,需要找到协调器的位置,然后发送消息。不再赘述
二、流程小结所以通过上边的代码整理可以看到:在集群模式下,当在一个NIFI节点的界面上,对流程进行修改的时候,该节点在后端,会先判断当前节点是否被选做了集群的协调器。如果是,则通过协调器就能将修改请求,发送到各个节点。如果不是,则当前节点知道协调器的位置,将修改请求转给协调器节点,然后重复一。协调器发出来的请求,在请求头中会带有标识 REPLICATION_INDICATOR_HEADER。各其他节点如果从请求中拿到的该字段有值,则不再对请求进行转发,转而执行该请求内容。
熟悉分布式的同学,会觉得博主的文档写的比较啰嗦,其实分布式这块儿,博主也是刚接触,很多概念也并不清楚,看代码看到了才会知道原来是这么回事。所以这个博客,也算是博主本人学习过程的一个记录。



