栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

storm源码分析研究(三)

storm源码分析研究(三)

2021SC@SDUSC

spout源码分析(二)

2021SC@SDUSC

在计算任务时需要的数据是由Spout提供的,所以Spout可以说是Storm中的消息源,它一般是从外部数据源(日志文件、数据库、消息队列等等)不间断地读取数据,然后发送给tuple元组的。

输出是通过Spout输出收集器发送的,即SpoutOutputCollector,而SpoutOutputCollector的接口是ISpoutOutputCollector。

编程人员一般可通过OutputFieldsDeclarer类的declareStream()方法来声明多个流,指定数据将要发送的流,然后使用SpoutOutputCollector的emit方法将数据发送

ISpoutOutputCollector.java
package org.apache.storm.spout;

import java.util.List;
import org.apache.storm.task.IErrorReporter;


public interface ISpoutOutputCollector extends IErrorReporter {

    List emit(String streamId, List tuple, Object messageId);

    void emitDirect(int taskId, String streamId, List tuple, Object messageId);

    long getPendingCount();

    void flush();
}

 

emit方法:
用来向外发送数据,它的返回值是该消息所有发送目标的Taskld集合。
输入参数streamld表示消息将被输出到的流;
tuple为要输出的消息列表;
messageld表示输出消息的标记信息。如果messageld被设置为null, Storm将不会追踪该消息,否则它会被用来追踪所发出消息的处理情况。

emitDirect方法:
参数与emit方法相似,主要区别在于使用emitDirect时, 只有由参数taskld所指定的Task才可以接收这条消息。
这个方法要求与参数streamld相对应的流必须被定义为直接流,同时接收端的Task也必须以直接分组 ( Direct Grouping ) 的方式来接收消息,否则会有异常抛出。另外,需要注意的是,如果没有下游节点接收该消息,那么该消息其实也就没有被真正发送。

SpoutOutputCollector.java

在SpoutOutputCollector类中,实现了消息发射的方法,并且还提供了多个重载方法方便用户使用。

package org.apache.storm.spout;

import java.util.List;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.utils.Utils;

public class SpoutOutputCollector implements ISpoutOutputCollector {
    ISpoutOutputCollector delegate;

    public SpoutOutputCollector(ISpoutOutputCollector delegate) {
        this.delegate = delegate;
    }

    @Override
    public List emit(String streamId, List tuple, Object messageId) {
        return delegate.emit(streamId, tuple, messageId);
    }
    public List emit(List tuple, Object messageId) {
        return delegate.emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }
        public List emit(String streamId, List tuple) {
        return emit(streamId, tuple, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, List tuple, Object messageId) {
        delegate.emitDirect(taskId, streamId, tuple, messageId);
    }
    public void emitDirect(int taskId, List tuple, Object messageId) {
        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
    }
    public void emitDirect(int taskId, String streamId, List tuple) {
        emitDirect(taskId, streamId, tuple, null);
    }
  public void emitDirect(int taskId, List tuple) {
        emitDirect(taskId, tuple, null);
    }

    @Override
    public void flush() {
        delegate.flush();
    }

    @Override
    public void reportError(Throwable error) {
        delegate.reportError(error);
    }

    @Override
    public long getPendingCount() {
        return delegate.getPendingCount();
    }
}

 

List emit(String streamId, List tuple, Object messageId):
指定一个streamid和message发射tuple消息并返回起发送消息的task的序号。当tuple消息完全处理了,就会回调ack方法,否则会回调fail方法。

List emit(List tuple, Object messageId):
emit的重载方法,这没有指定streamid,故采用默认的streamid

List emit(String streamId, List tuple):
emit的重载方法,这没有指定streamid,故采用默认的streamid,因为没有messageid,故ack方法和fail方法不会被调用

reportError(Throwable error):
处理异常

代理模式:
SpoutOutputCollector实际上是一个代理类,持有ISpoutOutputCollector类型的对象delegate,具体的执行都是通过_delegate调用相应的方法来实现的。

参考链接:https://blog.csdn.net/qq_29201447/article/details/81667876

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号