栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

storm源码分析(十二)

storm源码分析(十二)

文章目录
  • storm中的trident
    • 什么是trident?
    • TridentTopology
      • newStream()
      • build()

2021SC@SDUSC

storm中的trident

trident代码的阅读有两个重要的类TridentTopology、Stream,这两个类可以作为我们学习storm-trident源代码的入口。

trident的拓扑的构造分两部分:

一:构造trident逻辑的拓扑,这部分就是我们调用TridentTopology.newStream(…).each().groupBy()…等的过程中实现。这个过程完成一个逻辑拓扑的构造。保存到一个DefaultDirectedGraph _graph对象中;

二:根据trident逻辑拓扑构造成storm的拓扑,这部分在TridentTopology的build()方法中完成

什么是trident?

Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。Trident是完全容错的,拥有有且只有一次处理的语义,其实就是transactional的高级封装。这就让你可以很轻松 的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复 这些状态。
Trident是完全容错的,拥有有且只有一次处理的语义,其实就是transactional的高级封装。这就让你可以很轻松 的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据需要来恢复 这些状态。

Trident封装了transactional事务类,所以我们不再需要学习Batch相关的基础API了,减轻了学习成本。

trident每次处理消息均以batch为单位,即一次处理多个元组

trident是storm的更高层次抽象,主要提供了3个方面的好处:

(1)常用的count,sum等封装成了方法,可以直接调用不需要自己实现。
(2)提供一次原语,如groupby等。
(3)提供事务支持,可以保证数据均处理且只处理了一次(恰好一次)

TridentTopology newStream()
    public Stream newStream(String txId, IRichSpout spout) {
        return newStream(txId, new RichSpoutBatchExecutor(spout));
    }
    
    public Stream newStream(String txId, IPartitionedTridentSpout spout) {
        return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
    }
    
    public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
    }
 
    public Stream newStream(String txId, ITridentDataSource dataSource) {
        if (dataSource instanceof IBatchSpout) {
            return newStream(txId, (IBatchSpout) dataSource);
        } else if (dataSource instanceof ITridentSpout) {
            return newStream(txId, (ITridentSpout) dataSource);
        } else if (dataSource instanceof IPartitionedTridentSpout) {
            return newStream(txId, (IPartitionedTridentSpout) dataSource);
        } else if (dataSource instanceof IOpaquePartitionedTridentSpout) {
            return newStream(txId, (IOpaquePartitionedTridentSpout) dataSource);
        } else {
            throw new UnsupportedOperationException("Unsupported stream");
        }
    }
 
    public Stream newStream(String txId, IBatchSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
    
    public Stream newStream(String txId, ITridentSpout spout) {
        Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
        return addNode(n);
    }
 
    protected Stream addNode(Node n) {
        registerNode(n);
        return new Stream(this, n.name, n);
    }
 
    protected void registerNode(Node n) {
        _graph.addVertex(n);
        if(n.stateInfo!=null) {
            String id = n.stateInfo.id;
            if(!_colocate.containsKey(id)) {
                _colocate.put(id, new ArrayList());
            }
            _colocate.get(id).add(n);
        }
    }

newStream的第一个参数是txId,第二个参数是ITridentDataSource
ITridentDataSource分为好几个类型,分别有IBatchSpout、ITridentSpout、IPartitionedTridentSpout、IOpaquePartitionedTridentSpout

最后都是创建SpoutNode,然后registerNode添加到_graph(如果node的stateInfo不为null,还会添加到_colocate,不过SpoutNode该值为null),注意SpoutNode的SpoutType为SpoutNode.SpoutType.BATCH。

build()
    public StormTopology build() {
        DefaultDirectedGraph graph = (DefaultDirectedGraph) _graph.clone();
        
        //......
        
        List spoutNodes = new ArrayList<>();
        
        // can be regular nodes (static state) or processor nodes
        Set boltNodes = new linkedHashSet<>();
        for(Node n: graph.vertexSet()) {
            if(n instanceof SpoutNode) {
                spoutNodes.add((SpoutNode) n);
            } else if(!(n instanceof PartitionNode)) {
                boltNodes.add(n);
            }
        }
        
        Set initialGroups = new linkedHashSet<>();
 
        //......
 
        for(Node n: boltNodes) {
            initialGroups.add(new Group(graph, n));
        }
        
        
        GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
        grouper.mergeFully();
        Collection mergedGroups = grouper.getAllGroups();
        
        
        
        // add identity partitions between groups
        for(IndexedEdge e: new HashSet<>(graph.edgeSet())) {
            if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {                
                Group g1 = grouper.nodeGroup(e.source);
                Group g2 = grouper.nodeGroup(e.target);
                // g1 being null means the source is a spout node
                if(g1==null && !(e.source instanceof SpoutNode))
                    throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
                if(g1==null || !g1.equals(g2)) {
                    graph.removeEdge(e);
                    PartitionNode pNode = makeIdentityPartition(e.source);
                    graph.addVertex(pNode);
                    graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
                    graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));                    
                }
            }
        }
        
        //......
        
        // add in spouts as groups so we can get parallelisms
        for(Node n: spoutNodes) {
            grouper.addGroup(new Group(graph, n));
        }
        
        grouper.reindex();
        mergedGroups = grouper.getAllGroups();
                
        
        Map batchGroupMap = new HashMap<>();
        List> connectedComponents = new ConnectivityInspector<>(graph).connectedSets();
        for(int i=0; i parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
 
        TridentTopologyBuilder builder = new TridentTopologyBuilder();
 
        Map spoutIds = genSpoutIds(spoutNodes);
        Map boltIds = genBoltIds(mergedGroups);
 
        for(SpoutNode sn: spoutNodes) {
            Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
 
            Map spoutRes = new HashMap<>(_resourceDefaults);
            spoutRes.putAll(sn.getResources());
 
            Number onHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
            Number offHeap = spoutRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
            Number cpuLoad = spoutRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
            SpoutDeclarer spoutDeclarer = null;
 
            if(sn.type == SpoutNode.SpoutType.DRPC) {
 
                spoutDeclarer = builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
                                                              (IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
            } else {
                ITridentSpout s;
                if(sn.spout instanceof IBatchSpout) {
                    s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
                } else if(sn.spout instanceof ITridentSpout) {
                    s = (ITridentSpout) sn.spout;
                } else {
                    throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
                    // TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
                }
                spoutDeclarer = builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
            }
 
            if(onHeap != null) {
                if(offHeap != null) {
                    spoutDeclarer.setMemoryLoad(onHeap, offHeap);
                }
                else {
                    spoutDeclarer.setMemoryLoad(onHeap);
                }
            }
 
            if(cpuLoad != null) {
                spoutDeclarer.setCPULoad(cpuLoad);
            }
        }
 
        for(Group g: mergedGroups) {
            if(!isSpoutGroup(g)) {
                Integer p = parallelisms.get(g);
                Map streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
                Map groupRes = g.getResources(_resourceDefaults);
 
                Number onHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
                Number offHeap = groupRes.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB);
                Number cpuLoad = groupRes.get(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT);
 
                BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
                                                 committerBatches(g, batchGroupMap), streamToGroup);
 
                if(onHeap != null) {
                    if(offHeap != null) {
                        d.setMemoryLoad(onHeap, offHeap);
                    }
                    else {
                        d.setMemoryLoad(onHeap);
                    }
                }
 
                if(cpuLoad != null) {
                    d.setCPULoad(cpuLoad);
                }
 
                Collection inputs = uniquedSubscriptions(externalGroupInputs(g));
                for(PartitionNode n: inputs) {
                    Node parent = TridentUtils.getParent(graph, n);
                    String componentId = parent instanceof SpoutNode ?
                            spoutIds.get(parent) : boltIds.get(grouper.nodeGroup(parent));
                    d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
                }
            }
        }
        HashMap combinedMasterCoordResources = new HashMap(_resourceDefaults);
        combinedMasterCoordResources.putAll(_masterCoordResources);
        return builder.buildTopology(combinedMasterCoordResources);
    }

这里创建了TridentTopologyBuilder,然后对于spoutNodes,调用TridentTopologyBuilder.setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup)方法,添加spout
对于IBatchSpout类型的spout,通过BatchSpoutExecutor包装为ITridentSpout。

这里的streamName为streamId,通过UniqueIdGen.getUniqueStreamId生成,以s开头,之后是_streamCounter的计数,比如1,合起来就是s1;txStateId为用户传入的txId;batchGroup以bg开头,之后是connectedComponents的元素的index,比如0,合起来就是bg0;parallelism参数就是用户构建topology时设置的。

设置完spout之后,就是设置spout的相关资源配置,比如memoryLoad、cpuLoad;之后设置bolt,这里使用的是SubtopologyBolt,然后设置bolt相关的资源配置
最后调用TridentTopologyBuilder.buildTopology。

参考链接:https://blog.csdn.net/weixin_45366499/article/details/112008176

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663420.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号