- storm源码分析(二)
- Strom在Zookeeper中的目录结构
- 插入的包
- 该类中的属性
- 基本方法
2021SC@SDUSC storm源码分析(二)
2021SC@SDUSC
ZooKeeper:
Storm重点依赖的外部资源,Nimbus、Supervisor和Worker等都是把心跳数据保存在ZooKeeper上,Nimbus也是根据ZooKeeper上的心跳和任务运行状况进行调度和任务分配的。
Storm的所有的状态信息都是保存在Zookeeper里面,nimbus通过在zookeeper上面写状态信息来分配任务,
supervisor,task通过从zookeeper中读状态来领取任务,同时supervisor, task也会定义发送心跳信息到zookeeper,
使得nimbus可以监控整个storm集群的状态,从而可以重启一些挂掉的task。
ZooKeeper 使得整个storm集群十分的健壮,任何一台工作机器挂掉都没有关系,只要重启然后从zookeeper上面重新获取状态信息就可以了。
Strom在Zookeeper中的目录结构/assignments -> 任务分配信息
/storms -> 正在运行的topology的ID
/supervisors -> 所有的Supervisors的心跳信息
/workerbeats -> 所有的Worker的心跳
/errors -> 产生的出错信息
首先分析ClientZookeeper类。
插入的包package org.apache.storm.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.storm.callback.WatcherCallBack; import org.apache.storm.cluster.DaemonType; import org.apache.storm.cluster.VersionedData; import org.apache.storm.shade.org.apache.curator.framework.Curatorframework; import org.apache.storm.shade.org.apache.curator.framework.api.CuratorEvent; import org.apache.storm.shade.org.apache.curator.framework.api.CuratorEventType; import org.apache.storm.shade.org.apache.curator.framework.api.CuratorListener; import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener; import org.apache.storm.shade.org.apache.zookeeper.CreateMode; import org.apache.storm.shade.org.apache.zookeeper.KeeperException; import org.apache.storm.shade.org.apache.zookeeper.WatchedEvent; import org.apache.storm.shade.org.apache.zookeeper.data.ACL; import org.apache.storm.shade.org.apache.zookeeper.data.Stat; import org.apache.storm.utils.CuratorUtils; import org.apache.storm.utils.Utils; import org.apache.storm.utils.ZookeeperAuthInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory;该类中的属性
private static final ClientZookeeper INSTANCE = new ClientZookeeper();
private static Logger LOG = LoggerFactory.getLogger(ClientZookeeper.class);
private static ClientZookeeper _instance = INSTANCE;
ClientZookeeper类型的实例_instance和Logger类型的日志LOG。
基本方法public static void setInstance(ClientZookeeper u) {
_instance = u;
}
public static void resetInstance() {
_instance = INSTANCE;
}
public static void mkdirs(Curatorframework zk, String path, List acls) {
_instance.mkdirsImpl(zk, path, acls);
}
public static Curatorframework mkClient(Map conf, List servers, Object port,
String root, final WatcherCallBack watcher, Map authConf, DaemonType type) {
return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf, type);
}
一些基本方法,设置_instance。
mkdirs方法将Curatorframework、path和List 访问控制列表传入_instance中。
mkClient方法通过Map和servers列表等得到该实例_instance对应的client的信息。
deleteNodeBlobstore方法
public static void deleteNodeBlobstore(Curatorframework zk, String parentPath, String hostPortInfo) {
String normalizedParentPath = normalizePath(parentPath);
List childPathList = null;
if (existsNode(zk, normalizedParentPath, false)) {
childPathList = getChildren(zk, normalizedParentPath, false);
for (String child : childPathList) {
if (child.startsWith(hostPortInfo)) {
LOG.debug("deleteNode child {}", child);
deleteNode(zk, normalizedParentPath + "/" + child);
}
}
}
}
删除zookeeper中某个键的状态。
密钥内容以“nimbus主机端口信息”开头。
createNode方法
public static String createNode(Curatorframework zk, String path, byte[] data, CreateMode mode, Listacls) { String ret = null; try { String npath = normalizePath(path); ret = zk.create().creatingParentsIfNeeded().withMode(mode).withACL(acls).forPath(npath, data); } catch (Exception e) { throw Utils.wrapInRuntime(e); } return ret; } public static String createNode(Curatorframework zk, String path, byte[] data, List acls) { return createNode(zk, path, data, CreateMode.PERSISTENT, acls); }
创建节点。
tokenizePath方法
public static ListtokenizePath(String path) { String[] toks = path.split("/"); java.util.ArrayList rtn = new ArrayList (); for (String str : toks) { if (!str.isEmpty()) { rtn.add(str); } } return rtn; }
分解路径字符串。
// An highlighted block var foo = 'bar';
getDataWithVersion方法
public static VersionedDatagetDataWithVersion(Curatorframework zk, String path, boolean watch) { VersionedData data = null; try { byte[] bytes = null; Stat stats = new Stat(); String npath = normalizePath(path); if (existsNode(zk, npath, watch)) { if (watch) { bytes = zk.getData().storingStatIn(stats).watched().forPath(npath); } else { bytes = zk.getData().storingStatIn(stats).forPath(npath); } if (bytes != null) { int version = stats.getVersion(); data = new VersionedData<>(version, bytes); } } } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { // this is fine b/c we still have a watch from the successful exists call } else { Utils.wrapInRuntime(e); } } return data; }
获取数据和版本。
······
该类中都是一些基本的set、get方法。



