2021SC@SDUSC
storm代码阅读(七)2021SC@SDUSC
- storm代码阅读(七)
- GeneralTopologyContext
- GeneralTopologyContext的工具函数
GeneralTopologyContext类表示Topology的上下文环境,提供了较多的工具方法来方便获得Topology的结构信息。
GeneralTopologyContext定义如下:
public class GeneralTopologyContext implements JSONAware {
protected Map topoConf;
protected boolean doSanityCheck;
private StormTopology topology;
private Map taskToComponent;
private Map> componentToTasks;
private Map> componentToStreamToFields;
private String stormId;
}
下面介绍该类的一些成员变量:
topology: 为Thrift生成的Storm Topology类型的对象,其中含有Bolt和Spout的输入输出等信息。
taskToComponent: 为从TaskId到组件ID的映射。
componentToTasks: 为从组件ID到其对应的Task集合的映射。
componentToStreamToFields: 为从组件到每个输出流的模式的映射。
stormId: 表示Topology的id。
GeneralTopologyContext的工具函数GeneralTopologyContext中定义了很多工具函数,如下表所示:
在这些工具函数中,尤其重要的是getSources和getTargets方法,分别用来获取一个组件的输入和输出,具体代码如下:
getSources:
public Map getSources(String componentId) {
return getComponentCommon(componentId).get_inputs();
}
getTargets:
public Map> getTargets(String componentId) {
Map> ret = new HashMap<>();
for (String otherComponentId : getComponentIds()) {
Map inputs = getComponentCommon(otherComponentId).get_inputs();
for (Map.Entry entry : inputs.entrySet()) {
GlobalStreamId id = entry.getKey();
if (id.get_componentId().equals(componentId)) {
Map curr = ret.get(id.get_streamId());
if (curr == null) {
curr = new HashMap<>();
}
curr.put(otherComponentId, entry.getValue());
ret.put(id.get_streamId(), curr);
}
}
}
return ret;
}
里面的getComponentCommon和getComponentIds方法来自ThriftTopologyUtils类,它不是通过thriftAPI去nimbus获取信息, 只是从StormTopology里读取信息,而StormTopology类本身是generated by thrift。
thrift产生的class是有metaDataMap的, 实现如下:
public static SetgetComponentIds(StormTopology topology) { Set ret = new HashSet (); for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) { Map componentMap = (Map ) topology.getFieldValue(f); ret.addAll(componentMap.keySet()); } return ret; }



