2021SC@SDUSC
spout源码分析(一)2021SC@SDUSC
- spout源码分析(一)
- 核心概念介绍
- ISpout.java
- ShellSpout.java
2021SC@SDUSC 核心概念介绍
1、结构:
Spout是storm的核心组件之一,最源头的接口是IComponent。
2、发送:
当Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。Spout可以发射多个流,可以定义多个流(即定义多个stream),也可以使用方法来发射指定的流。
3、重要结构:
Spout的重要方法是nextTuple,nextTuple方法发射一个新的元组到Topology,如果没有新的元组发射,则直接返回。注意任务Spout的nextTuple方法都不要实现成阻塞的,因为storm是在相同的线程中调用spout的方法。
Spout的另外两个重要方法是ack和fail方法,当spout发射的元组被拓扑成功处理时,调用ack方法,当处理失败时,调用fail方法,此外,ack和fail方法仅被可靠的spout调用。
ISpout接口:
storm实现主要依靠以下几个函数,全局代码如下:
package org.apache.storm.spout;
import java.io.Serializable;
import java.util.Map;
import org.apache.storm.task.TopologyContext;
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
Strom支持所有的基本类型,当它使用元组作为数据模型,元组中的每个字段都可以是任何类型的对象。而如果要使用自己定义的类型,需要为自己定义的类型实现并且注册一个serializer。每个节点还必须要为输出的元组定义字段名称。
部分函数解释:
open():
当该组件的任务在集群上初始化时调用。它为spout提供了执行spout的环境。
close():
当ISpout即将关闭时调用。不能保证会调用close,因为supervisor会杀死集群上的的worker进程。
activate():
当spout从非激活模式被激活时调用。
deactivate():
当spout失效时调用。
重载函数如下:
public void open(MaptopoConf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.context = context; if (topoConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) { workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)); } else { workerTimeoutMills = 1000 * ObjectReader.getInt(topoConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); } process = new ShellProcess(command); if (!env.isEmpty()) { process.setEnv(env); } Number subpid = process.launch(topoConf, context, changeDirectory); LOG.info("Launched subprocess with pid " + subpid); logHandler = ShellUtils.getLogHandler(topoConf); logHandler.setUpContext(ShellSpout.class, process, this.context); heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } @Override public void close() { heartBeatExecutorService.shutdownNow(); process.destroy(); running = false; } @Override public void nextTuple() { this.sendSyncCommand("next", ""); } @Override public void ack(Object msgId) { this.sendSyncCommand("ack", msgId); } @Override public void fail(Object msgId) { this.sendSyncCommand("fail", msgId); } @Override public void activate() { LOG.info("Start checking heartbeat..."); // prevent timer to check heartbeat based on last thing before activate setHeartbeat(); if (heartBeatExecutorService.isShutdown()) { //In case deactivate was called before heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS); this.sendSyncCommand("activate", ""); } @Override public void deactivate() { this.sendSyncCommand("deactivate", ""); heartBeatExecutorService.shutdownNow(); }
void open(Map
参数:
topoconf :
Storm关于这个Spout的配置
context :
用来获取该Spout任务的信息,包括任务id,组件id,输入输出信息等等
collector :
用来从这个Spout里发送元组,元组可以在任何时间里发送,包括open和close函数里。collector是线程安全的,应该被作为一个实例对象保存到Spout对象里。
void ack(Object msgId):
以msgId消息告诉Storm这个Spout已经成功输出了该元组
void activate():
激活Spout,Spout从deactivate模式转化为activate模式,Spout开始调用nextTuple输出数据。
void close():
关闭Spout
void deactivate():
解除激活Spout,Spout从activate模式转化为deactivate模式,Spout停止调用nextTuple输出数据
void fail(Object msgId):
以msgId消息告诉Storm这个Spout输出该元组失败,主要用于将该元组重新放回消息队列,以在一段时间后重发该元组
void nextTuple():
调用该函数请求Storm发送元组到Output Collector,这个函数不应该是阻塞的,当没有元组发送时,一般调用sleep,以充分利用CPU。
参考链接:
https://blog.csdn.net/wdasdaw/article/details/48896321
https://xlucas.blog.csdn.net/article/details/55301577



