栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

storm代码阅读(二)

storm代码阅读(二)

2021SC@SDUSC

storm代码阅读(二)

2021SC@SDUSC

Topology部分阅读(一)
  • storm代码阅读(二)
    • topology介绍
    • TopologyBuilder代码总览
    • TopologyBuilder具体代码段分析

topology介绍

Storm集群中的任务称之为Topology。

Topology任务从数据源中获取数据,然后进行后续处理。在Topology中从外部数据源获取数据的组件,称之为Spout,处理数据的组件,称之为bolt。一个Topology就是由一个或者多个的Spout和Bolt组成。特别注意的是一个Topology中,必须同时存在Spout和Bolt,Spout和Bolt数量可以随意。几种topology举例:



Topology是由Spout、Bolt、数据载体Tuple等构成的一定规则的网络拓扑图。Storm的拓扑Topology类似于MapReduce任务,一个关键的区别是MapReduce任务运行一段时间后最终会完成,而Storm拓扑一直运行(直到杀掉它)。

TopologyBuilder代码总览

Storm提供了TopologyBuilder类来创建Topology。TopologyBuilder实际上是封装了Topology的Thrift接口,也就是说Topology实际上是通过Thrift定义的一个结构,TopologyBuilder将这个对象建立起来,然后Nimbus实际上运行一个Thrift服务器,用于接收用户提交的结构。由于采用Thrift实现,所以用户可以用其他语言建立Topology,这样就提供了比较方便的多语言操作支持。

它的主要方法包括setSpout、setBolt以及他们的重载方法,最终目的是创建StormTopology对象。

TopologyBuilder类定义如下:

public class TopologyBuilder {
    private final Map bolts = new HashMap<>();
    private final Map spouts = new HashMap<>();
    private final Map commons = new HashMap<>();
    private final Map> componentToSharedMemory = new HashMap<>();
    private final Map sharedMemory = new HashMap<>();
    private boolean hasStatefulBolt = false;

    private Map stateSpouts = new HashMap<>();
    private List workerHooks = new ArrayList<>();

    private static String mergeIntoJson(Map into, Map newMap) {
        Map res = new HashMap<>(into);
        res.putAll(newMap);
        return JSONValue.toJSonString(res);
    }

    public StormTopology createTopology() {
        Map boltSpecs = new HashMap<>();
        Map spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for (String boltId : bolts.keySet()) {
            IRichBolt bolt = bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try {
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException("Bolt '" + boltId + "' contains a non-serializable field of type "
                                    + wrapperCause.getCause().getMessage() + ", "
                                    + "which was instantiated prior to topology creation. "
                                    + wrapperCause.getCause().getMessage()
                                    + " "
                                    + "should be instantiated within the prepare method of '"
                                    + boltId
                                    + " at the earliest.",
                            wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for (String spoutId : spouts.keySet()) {
            IRichSpout spout = spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try {
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                            "Spout '" + spoutId + "' contains a non-serializable field of type "
                                    + wrapperCause.getCause().getMessage()
                                    + ", which was instantiated prior to topology creation. "
                                    + wrapperCause.getCause().getMessage()
                                    + " should be instantiated within the open method of '"
                                    + spoutId
                                    + " at the earliest.",
                            wrapperCause);
                }
                throw wrapperCause;
            }
        }

        StormTopology stormTopology = new StormTopology(spoutSpecs,
                                                        boltSpecs,
                                                        new HashMap<>());

        stormTopology.set_worker_hooks(workerHooks);

        if (!componentToSharedMemory.isEmpty()) {
            stormTopology.set_component_to_shared_memory(componentToSharedMemory);
            stormTopology.set_shared_memory(sharedMemory);
        }

        return Utils.addVersions(stormTopology);
    }

    
    public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }

    
    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) throws IllegalArgumentException {
        validateUnusedId(id);
        initCommon(id, bolt, parallelismHint);
        bolts.put(id, bolt);
        return new BoltGetter(id);
    }

    
    public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }

    
    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelismHint) throws IllegalArgumentException {
        return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);
    }

    
    public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }

    
    public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws IllegalArgumentException {
        return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint);
    }

    
    public  BoltDeclarer setBolt(String id, IStatefulBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }

    
    public  BoltDeclarer setBolt(String id, IStatefulBolt bolt, Number parallelismHint) throws
        IllegalArgumentException {
        hasStatefulBolt = true;
        return setBolt(id, new StatefulBoltExecutor(bolt), parallelismHint);
    }

    
    public  BoltDeclarer setBolt(String id, IStatefulWindowedBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }

    
    public  BoltDeclarer setBolt(String id, IStatefulWindowedBolt bolt, Number parallelismHint) throws
        IllegalArgumentException {
        hasStatefulBolt = true;
        IStatefulBolt executor;
        if (bolt.isPersistent()) {
            executor = new PersistentWindowedBoltExecutor<>(bolt);
        } else {
            executor = new StatefulWindowedBoltExecutor(bolt);
        }
        return setBolt(id, new StatefulBoltExecutor(executor), parallelismHint);
    }

    
    public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, String... fields) throws
        IllegalArgumentException {
        return setBolt(id, biConsumer, null, fields);
    }

    
    public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, Number parallelismHint,
                                String... fields) throws IllegalArgumentException {
        return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelismHint);
    }

    
    public BoltDeclarer setBolt(String id, SerializableConsumer consumer) throws IllegalArgumentException {
        return setBolt(id, consumer, null);
    }

    
    public BoltDeclarer setBolt(String id, SerializableConsumer consumer, Number parallelismHint) throws IllegalArgumentException {
        return setBolt(id, new LambdaConsumerBolt(consumer), parallelismHint);
    }

    
    public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException {
        return setSpout(id, spout, null);
    }

    
    public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHint) throws IllegalArgumentException {
        validateUnusedId(id);
        initCommon(id, spout, parallelismHint);
        spouts.put(id, spout);
        return new SpoutGetter(id);
    }

    
    public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException {
        return setSpout(id, supplier, null);
    }

    
    public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelismHint) throws IllegalArgumentException {
        return setSpout(id, new LambdaSpout(supplier), parallelismHint);
    }

    
    public void addWorkerHook(IWorkerHook workerHook) {
        if (null == workerHook) {
            throw new IllegalArgumentException("WorkerHook must not be null.");
        }

        workerHooks.add(ByteBuffer.wrap(Utils.javaSerialize(workerHook)));
    }

    private void validateUnusedId(String id) {
        if (bolts.containsKey(id)) {
            throw new IllegalArgumentException("Bolt has already been declared for id " + id);
        }
        if (spouts.containsKey(id)) {
            throw new IllegalArgumentException("Spout has already been declared for id " + id);
        }
        if (stateSpouts.containsKey(id)) {
            throw new IllegalArgumentException("State spout has already been declared for id " + id);
        }
    }

    
    private void maybeAddCheckpointSpout() {
        if (hasStatefulBolt) {
            setSpout(CHECKPOINT_COMPONENT_ID, new CheckpointSpout(), 1);
        }
    }

    private void maybeAddCheckpointInputs(ComponentCommon common) {
        if (hasStatefulBolt) {
            addCheckPointInputs(common);
        }
    }

    
    private IRichBolt maybeAddCheckpointTupleForwarder(IRichBolt bolt) {
        if (hasStatefulBolt && !(bolt instanceof StatefulBoltExecutor)) {
            bolt = new CheckpointTupleForwarder(bolt);
        }
        return bolt;
    }

    
    private void addCheckPointInputs(ComponentCommon component) {
        Set checkPointInputs = new HashSet<>();
        for (GlobalStreamId inputStream : component.get_inputs().keySet()) {
            String sourceId = inputStream.get_componentId();
            if (spouts.containsKey(sourceId)) {
                checkPointInputs.add(new GlobalStreamId(CHECKPOINT_COMPONENT_ID, CHECKPOINT_STREAM_ID));
            } else {
                checkPointInputs.add(new GlobalStreamId(sourceId, CHECKPOINT_STREAM_ID));
            }
        }
        for (GlobalStreamId streamId : checkPointInputs) {
            component.put_to_inputs(streamId, Grouping.all(new NullStruct()));
        }
    }

    private ComponentCommon getComponentCommon(String id, IComponent component) {
        ComponentCommon ret = new ComponentCommon(commons.get(id));
        OutputFieldsGetter getter = new OutputFieldsGetter();
        component.declareOutputFields(getter);
        ret.set_streams(getter.getFieldsDeclaration());
        return ret;
    }

    private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
        ComponentCommon common = new ComponentCommon();
        common.set_inputs(new HashMap());
        if (parallelism != null) {
            int dop = parallelism.intValue();
            if (dop < 1) {
                throw new IllegalArgumentException("Parallelism must be positive.");
            }
            common.set_parallelism_hint(dop);
        }
        Map conf = component.getComponentConfiguration();
        if (conf != null) {
            common.set_json_conf(JSONValue.toJSonString(conf));
        }
        commons.put(id, common);
    }

TopologyBuilder具体代码段分析

具体代码段分析:

    private final Map bolts = new HashMap<>();
    private final Map spouts = new HashMap<>();
    private final Map commons = new HashMap<>();
    private final Map> componentToSharedMemory = new HashMap<>();
    private final Map sharedMemory = new HashMap<>();
    private boolean hasStatefulBolt = false;

    private Map stateSpouts = new HashMap<>();
    private List workerHooks = new ArrayList<>();

该部分定义了类成员变量。bolts包含了所有Bolt对象,它们均为IRichBolt类型;spouts包含了所有Spout对象,均为IRichBolt类型;commons包含了所有Bolt及Spout对象;stateSpouts包含了StateSpout对象,StateSpouts是具有同步功能的Spout对象。

    public StormTopology createTopology() {
        Map boltSpecs = new HashMap<>();
        Map spoutSpecs = new HashMap<>();
        maybeAddCheckpointSpout();
        for (String boltId : bolts.keySet()) {
            IRichBolt bolt = bolts.get(boltId);
            bolt = maybeAddCheckpointTupleForwarder(bolt);
            ComponentCommon common = getComponentCommon(boltId, bolt);
            try {
                maybeAddCheckpointInputs(common);
                boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException("Bolt '" + boltId + "' contains a non-serializable field of type "
                                    + wrapperCause.getCause().getMessage() + ", "
                                    + "which was instantiated prior to topology creation. "
                                    + wrapperCause.getCause().getMessage()
                                    + " "
                                    + "should be instantiated within the prepare method of '"
                                    + boltId
                                    + " at the earliest.",
                            wrapperCause);
                }
                throw wrapperCause;
            }
        }
        for (String spoutId : spouts.keySet()) {
            IRichSpout spout = spouts.get(spoutId);
            ComponentCommon common = getComponentCommon(spoutId, spout);
            try {
                spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
            } catch (RuntimeException wrapperCause) {
                if (wrapperCause.getCause() != null && NotSerializableException.class.equals(wrapperCause.getCause().getClass())) {
                    throw new IllegalStateException(
                            "Spout '" + spoutId + "' contains a non-serializable field of type "
                                    + wrapperCause.getCause().getMessage()
                                    + ", which was instantiated prior to topology creation. "
                                    + wrapperCause.getCause().getMessage()
                                    + " should be instantiated within the open method of '"
                                    + spoutId
                                    + " at the earliest.",
                            wrapperCause);
                }
                throw wrapperCause;
            }
        }

        StormTopology stormTopology = new StormTopology(spoutSpecs,
                                                        boltSpecs,
                                                        new HashMap<>());

        stormTopology.set_worker_hooks(workerHooks);

        if (!componentToSharedMemory.isEmpty()) {
            stormTopology.set_component_to_shared_memory(componentToSharedMemory);
            stormTopology.set_shared_memory(sharedMemory);
        }

        return Utils.addVersions(stormTopology);
    }

该部分根据输入的Blot和Spout对象构建StormTopology对象。根据 boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); 和 spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));可以看出,StormTopology中Blot和Spout均为对象序列化后得到的字节数组。

    public BoltDeclarer setBolt(String id, IRichBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }


    public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) throws IllegalArgumentException {
        validateUnusedId(id);
        initCommon(id, bolt, parallelismHint);
        bolts.put(id, bolt);
        return new BoltGetter(id);
    }


    public BoltDeclarer setBolt(String id, IBasicBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }


    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelismHint) throws IllegalArgumentException {
        return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);
    }


    public BoltDeclarer setBolt(String id, IWindowedBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }


    public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelismHint) throws IllegalArgumentException {
        return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint);
    }


    public  BoltDeclarer setBolt(String id, IStatefulBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }


    public  BoltDeclarer setBolt(String id, IStatefulBolt bolt, Number parallelismHint) throws
        IllegalArgumentException {
        hasStatefulBolt = true;
        return setBolt(id, new StatefulBoltExecutor(bolt), parallelismHint);
    }


    public  BoltDeclarer setBolt(String id, IStatefulWindowedBolt bolt) throws IllegalArgumentException {
        return setBolt(id, bolt, null);
    }


    public  BoltDeclarer setBolt(String id, IStatefulWindowedBolt bolt, Number parallelismHint) throws
        IllegalArgumentException {
        hasStatefulBolt = true;
        IStatefulBolt executor;
        if (bolt.isPersistent()) {
            executor = new PersistentWindowedBoltExecutor<>(bolt);
        } else {
            executor = new StatefulWindowedBoltExecutor(bolt);
        }
        return setBolt(id, new StatefulBoltExecutor(executor), parallelismHint);
    }


    public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, String... fields) throws
        IllegalArgumentException {
        return setBolt(id, biConsumer, null, fields);
    }


    public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, Number parallelismHint,
                                String... fields) throws IllegalArgumentException {
        return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelismHint);
    }


    public BoltDeclarer setBolt(String id, SerializableConsumer consumer) throws IllegalArgumentException {
        return setBolt(id, consumer, null);
    }


    public BoltDeclarer setBolt(String id, SerializableConsumer consumer, Number parallelismHint) throws IllegalArgumentException {
        return setBolt(id, new LambdaConsumerBolt(consumer), parallelismHint);
    }

该部分定义了setBolt方法及各种重载方法。其中return setBolt(id, new BasicBoltExecutor(bolt), parallelismHint);可以看出setBolt方法会利用BasicBoltExecutor包装输入的IBasicBolt对象,其中BasicBoltExecutor还实现了消息的跟踪和发送。validateUnusedId(id);会检测输入的组件ID当前是否唯一,initCommon(id, bolt, parallelismHint); bolts.put(id, bolt);这两句用于生成ComponentCommon对象。下一行return new BoltGetter(id);返回一个BoltGetter对象,将利用其为Bolt对象添加输入。

    public SpoutDeclarer setSpout(String id, IRichSpout spout) throws IllegalArgumentException {
        return setSpout(id, spout, null);
    }


    public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHint) throws IllegalArgumentException {
        validateUnusedId(id);
        initCommon(id, spout, parallelismHint);
        spouts.put(id, spout);
        return new SpoutGetter(id);
    }


    public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException {
        return setSpout(id, supplier, null);
    }


    public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelismHint) throws IllegalArgumentException {
        return setSpout(id, new LambdaSpout(supplier), parallelismHint);
    }

该部分定义了setSpout方法,它类似于setBolt方法,也将产生ComponentCommon对象。

    private ComponentCommon getComponentCommon(String id, IComponent component) {
        ComponentCommon ret = new ComponentCommon(commons.get(id));
        OutputFieldsGetter getter = new OutputFieldsGetter();
        component.declareOutputFields(getter);
        ret.set_streams(getter.getFieldsDeclaration());
        return ret;
    }

该部分定义了getComponentCommon方法,该方法主要定义输出的流。

    private void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException {
        ComponentCommon common = new ComponentCommon();
        common.set_inputs(new HashMap());
        if (parallelism != null) {
            int dop = parallelism.intValue();
            if (dop < 1) {
                throw new IllegalArgumentException("Parallelism must be positive.");
            }
            common.set_parallelism_hint(dop);
        }
        Map conf = component.getComponentConfiguration();
        if (conf != null) {
            common.set_json_conf(JSONValue.toJSONString(conf));
        }
        commons.put(id, common);
    }

该部分定义了initCommon方法,主要对ComponentCommon对象进行初始化,如设置并行度和标准配置等。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/303019.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号