栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka Stream

Kafka Stream

kafka创建topology的API如下:

  val firstStream = builder.stream[String, String](topic = "from-basic-topic")

查看builder源码,其实是对应了一个“StreamBuilder”,其实内部是

InternalStreamsBuilder

下面是InternalStreamsBuilder,关键在于GraphNode ,他构成了DSL的DAG

    private static final String TOPOLOGY_ROOT = "root";
    private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);

    protected final GraphNode root = new GraphNode(TOPOLOGY_ROOT) {
        @Override
        public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
            // no-op for root node
        }
    };

 部分代码:

public abstract class GraphNode {

    private final Collection childNodes = new linkedHashSet<>();
    private final Collection parentNodes = new linkedHashSet<>();
    private final String nodeName;
    private boolean keyChangingOperation;
    private boolean valueChangingOperation;
    private boolean mergeNode;
    private Integer buildPriority;
    private boolean hasWrittenToTopology = false;
    
    ... ...
}

可以看出它是一个图的邻接表结构,以一个peek操作为例:

    @Override
    public KStream peek(final ForeachAction action,
                              final Named named) {
        Objects.requireNonNull(action, "action can't be null");
        Objects.requireNonNull(named, "named can't be null");

        final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME);
        // ProcessorGraphNode是带有处理功能的Node,也是一个图
        final ProcessorParameters processorParameters =
            new ProcessorParameters<>(new KStreamPeek<>(action, true), name);
        final ProcessorGraphNode peekNode =
            new ProcessorGraphNode<>(name, processorParameters);

        // 添加结点到DAG里,graphNode就是当前KStream对应的结点
        builder.addGraphNode(graphNode, peekNode);

        return new KStreamImpl<>(
            name,
            keySerde,
            valueSerde,
            subTopologySourceNodes,
            repartitionRequired,
            peekNode,
            builder);
    }

再看看KStream的构造函数就清楚了,他其实就是一个DAG的结点,被封装了一层:

    KStreamImpl(final String name,
                final Serde keySerde,
                final Serde valueSerde,
                final Set subTopologySourceNodes,
                final boolean repartitionRequired,
                final GraphNode graphNode,
                final InternalStreamsBuilder builder) {
        super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
        this.repartitionRequired = repartitionRequired;
    }

 Kafka Stream的API其实就是在构成一个带有process的DAG,后面的优化操作以及复杂的window等等就不去关注了!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677164.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号