输入和输出在Storm UI中不存在。在Storm
UI中,您看不到发出的元组,处理时间,集群配置和集群的运行状况。要查看输出和输入,请使用记录器机制,然后检查其中存在的每个工作日志文件风暴软件包的日志文件夹。要在Storm中创建拓扑,您需要两件事:一个喷嘴和一个螺栓。请在下面的示例代码中找到:-
SampleSpout.java
import java.util.ArrayList;import java.util.List;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;public class SampleSpout implements IRichSpout{ SpoutOutputCollector collector; int i=0; List<Object> tupleList; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public void activate() { // TODO Auto-generated method stub } @Override public void deactivate() { // TODO Auto-generated method stub } @Override public void nextTuple() { tupleList=new ArrayList<Object>(); tupleList.add("storm"+i); tupleList.add(i); collector.emit(tupleList,i); i++; } @Override public void ack(Object msgId) { // TODO Auto-generated method stub } @Override public void fail(Object msgId) { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; }}SampleBolt.java
import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;public class SampleBolt implements IBasicBolt { private static Logger log = LoggerFactory.getLogger(SampleBolt.class); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector collector) { log.info(input.getValues().toString()+"output values"); } @Override public void cleanup() { // TODO Auto-generated method stub }}SampleTopology.java
import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;public class SampleTopology { public static void main(String[] args) { TopologyBuilder topology=new TopologyBuilder(); topology.setSpout("sampleSpout",new SampleSpout()); topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout"); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster=new LocalCluster(); cluster.submitTopology("test", conf, topology.createTopology()); }}


