栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Nimbus(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Nimbus(二)

2021SC@SDUSC

上一篇博客展示了nimbus的核心代码和一些基本的分析注释,本文将详细分析解释第一部分代码,并进行补充。

nimbus-data
         nimbus-data函数的定义如下所示

(defn nimbus-data [conf inimbus]

 (let [forced-scheduler (.getForcedScheduler inimbus)]

   {:conf conf

    :inimbus inimbus

    :submitted-count (atom 0)

    :storm-cluster-state (cluster/mk-storm-cluster-state conf)

    :submit-lock (Object.)

    :heartbeats-cache (atom {})

    :downloaders (file-cache-map conf)

    :uploaders (file-cache-map conf)

    :uptime (uptime-computer)

    :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))

    :timer (mk-timer :kill-fn (fn [t]

                                 (log-error t"Error when processing event")

                                 (halt-process!20 "Error when processing an event")

                                 ))

    :scheduler (mk-scheduler conf inimbus)

    }))

         let的bindings定义的变量forced-scheduler的值是个nil,因为INimbus中实现的getForcedScheduler方法是个空实现。

         再看看let 块的执行体,它是一个map。其中:conf 的值是一个storm 配置的map,:inimbus的值是INimbus接口的匿名实现也就是一个适配器。submitted-count 的值是一个atom类型的值,初始化为0。:storm-cluster-state的值是什么,需要看看cluster/mk-storm-cluster-state这个函数内部是做了什么。

         cluster/mk-storm-cluster-state这个函数内部匿名实现了StormClusterState协议,而实现时又用到了ClusterState协议。这个函数主要用于获取集群的各种状态,底层zookeeper的操作上层封装在了ClusterState协议的实现,而真正底层访问zookeeper使用apache curator这个开源库。ClusterState协议属于zookeeper的部分,就不展开详细讲述了,我们接着代码继续分析

 

          :submit-lock (Object.)    

    :heartbeats-cache (atom {})

    :downloaders (file-cache-map conf)

    

submit-lock就是一个java的Object对象用于加锁解锁,heartbeats-cache是一个atom引用类型,这个引用所包含的的值是一个空的map。

downloaders是一个TimeCacheMap对象。downloaders的值是(file-cache-mapconf)

其中file-cache-map是一个函数,只需要知道TimeCacheMap是storm自己实现的,对外提供了ExpiredCallback接口类,这个接口类中只有一个方法是expire方法。在file-cache-map函数中匿名实现了这个接口。

TimeCacheMap非常复杂,不过多介绍,只需知道TimeCacheMap对外展示的是一个类似map类,可以在生成这个对象的时候设置超时回调函数(也就是ExpiredCallback这个接口,超时处理,在这里就是关闭流),之后可以像map一样put,get进行操作,在超时时间到了之后,会调用用户设置的回到函数,将超时的数据传递到回调函数中进行处理。需要注意的是,这个类的超时不是很精确。

        回到nimbus-data接着看

         :uploaders(file-cache-map conf)

    :uptime (uptime-computer)

    

uploaders的值和前面的downloader的值是一样的,也是一个TimeCacheMap对象。

uptime的值是一个函数,是uptime-computer这个函数所返回的一个匿名函数,如下所示:

(defn uptime-computer []

 (let [start-time (current-time-secs)]

   (fn []

     (time-delta start-time)

     )))

uptime-computer就是用于计算时间差的,需要注意的是,在添加:uptime (uptime-computer)

这条记录的时候,start-time已经开始取当前时间了,那么在之后真正取:uptime的值开始计算的时候,就会得到时间差。

在service-handler中有条语句nimbus-uptime ((:uptime nimbus)) 这就会计算出,在设置nimbus这个map的时候时间点和当前时间的差。两层圆括号的意思是,先从map中取出值,因为值是一个函数,所以还要再用一对圆括号括起来执行。

         回到nimbus-data接着看

         :validator(new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))

    :timer (mk-timer :kill-fn (fn [t]

                                 (log-error t "Error whenprocessing event")

                                 (halt-process!20 "Error when processing an event")

                                 ))

    :scheduler (mk-scheduler conf inimbus)

validator的值是一个对象。类名是在default.yaml中所指定的nimbus.topology.validator:"backtype.storm.nimbus.DefaultTopologyValidator",这里的先取到类名,然后根据类名创建对象。

timer的值是一个map ,这个map中包含了定时器,队列,锁等数据。在之前分析supervisor启动过程中也有提到mk-timer,这里再来回顾一下。mk-timer和java中的Timer是很像的。mk-timer的做法是启动一个线程,循环从队列中去peek一个数据,这个数据是一个vector类型的数据,内有会有三个值,分别是时间,函数,和uuid。线程把当前时间和从队列预读的这个数据中的时间值进行比较,如果时间到了,则从队列中弹出这个数据,然后执行这个数据中的第二个值,也就是函数。在执行完之后,会sleep 1秒,这个sleep的实现是在storm自己提供的Time.java中。当sleep结束,会重复这个动作。

mk-timer最后会返回的值如下所示:

{:timer-thread timer-thread

    :queue queue

    :active active

     :lock lock

    :cancel-notifier notifier}

也就是定时器线程,队列,锁等数据。

scheduler的值是一个调度器。这个调度器可以有三种,默认的调度器,从INimbus中获取的调度器,以及是用户自定义的调度器。在这里取到的是默认的调度器。也就是定时器线程,队列,锁等数据。scheduler的值是一个调度器。这个调度器可以有三种,默认的调度器,从INimbus中获取的调度器,以及是用户自定义的调度器。在这里取到的是默认的调度器。

参考博客:  https://blog.csdn.net/chlaws/article/details/13756021

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/324993.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号