是的,有可能:
TopologyBuilder b = new TopologyBuilder();b.setSpout("topic_1", new KafkaSpout(...));b.setSpout("topic_2", new KafkaSpout(...));b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");您也可以使用任何其他分组。
更新:
为了区分使用者螺栓中的元组(即topic_1或topic_2),有两种可能性:
1)您可以使用操作员ID(如@ user-4870385所建议):
if(input.getSourceComponent().equalsIgnoreCase("topic_1")) { //do something} else { //do something}2)您可以使用流名称(@zenbeni建议)。对于这种情况,两个喷口都需要声明命名流,而螺栓需要通过流名称连接到喷口:
public class MyKafkaSpout extends KafkaSpout { final String streamName; public MyKafkaSpout(String stream) { this.streamName = stream; } // other stuff omitted @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // compare KafkaSpout.declareOutputFields(...) declarer.declare(streamName, _spoutConfig.scheme.getOutputFields()); }}构建拓扑,现在需要使用流名称:
TopologyBuilder b = new TopologyBuilder();b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");MyBolt现在可以在流名称中区分输入元组:
// in my MyBolt.execute():if(input.getSourceStreamId().equals("Topic1")) { // do something} else { // do something}讨论:
虽然使用流名称的 第二种 方法更自然(根据@zenbeni),但 第 一种方法更灵活(IHMO)。流名称直接由spout /
bolt声明(即,在编写spout / bolt代码时);与此相反,当拓扑放在一起操作者ID分配(即,在时间喷口/螺栓 使用 )。
假设我们得到三个螺栓作为类文件(没有源代码)。前两个应该用作生产者,并且都用相同的名称声明输出流。如果第三个使用者按流区分输入元组,则此方法将无效。即使两个给定的生产者螺栓都声明了不同的输出流名称,预期的输入流名称也可能在使用者螺栓中进行了硬编码,并且可能不匹配。因此,它也不起作用。但是,如果使用者螺栓使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配期望的组件ID。
当然,有可能从给定的类继承(如果未声明
final并进行覆盖
declareOutputFields(...),以便分配自己的流名称。但是,这是要做的更多工作。



