栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

storm代码阅读(十二)

storm代码阅读(十二)

2021SC@SDUSC

storm代码阅读(十二)

2021SC@SDUSC

Task部分阅读(七)
  • storm代码阅读(十二)
    • Storm中传输的消息及序列化
    • 反序列化

Storm中传输的消息及序列化

下面将介绍Storm如何对消息进行序列化并分析实际的传输内容。

KryoTupleSerializer类用于实现消息的序列化。

代码如下:

public class KryoTupleSerializer implements ITupleSerializer {
    private KryoValuesSerializer kryo;
    private SerializationFactory.IdDictionary ids;
    private Output kryoOut;

    public KryoTupleSerializer(final Map conf, final GeneralTopologyContext context) {
        kryo = new KryoValuesSerializer(conf);
        kryoOut = new Output(2000, 2000000000);
        ids = new SerializationFactory.IdDictionary(context.getRawTopology());
    }

    @Override
    public byte[] serialize(Tuple tuple) {
        try {

            kryoOut.clear();
            kryoOut.writeInt(tuple.getSourceTask(), true);
            kryoOut.writeInt(ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
            tuple.getMessageId().serialize(kryoOut);
            kryo.serializeInto(tuple.getValues(), kryoOut);
            return kryoOut.toBytes();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

从public byte[] serialize(Tuple tuple)中可以看出,Strom实际传输的消息为:
SourceTaskId:int; StreamId:int; messageId:MessageId; Tuplevalues: List< Object>

其中,流序号是对组件中所有的流排序后确定的统一编号,IdDictionary类会根据输入的Topology定义获取所有的组件名字以及它们的流编号,然后进行编号。Tuplevalues为用户实际发送的消息。

MessageId类存储从消息的RootId到其衍生消息的消息ID的异或值的映射关系,它用于跟踪消息,内含一个哈希表。

MessageId类中用于实现序列化的方法如下:

public void serialize(Output out) throws IOException {
    out.writeInt(anchorsToIds.size(), true);
    for (Entry anchorToId : anchorsToIds.entrySet()) {
        out.writeLong(anchorToId.getKey());
        out.writeLong(anchorToId.getValue());
    }
}

序列化的数据为< 元素个数,[RootId,AckVal]+>,而反序列化方法与其类似。

反序列化

KryoTupleDeserializer类用来对收到的消息字节数组进行反序列化。

代码如下:

public TupleImpl deserialize(byte[] ser) {
    try {
        kryoInput.setBuffer(ser);
        int taskId = kryoInput.readInt(true);
        int streamId = kryoInput.readInt(true);
        String componentName = context.getComponentId(taskId);
        String streamName = ids.getStreamName(componentName, streamId);
        MessageId id = MessageId.deserialize(kryoInput);
        List values = kryo.deserializeFrom(kryoInput);
        return new TupleImpl(context, values, componentName, taskId, streamName, id);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
 

反序列化的过程是:首先依次读取TaskId,并通过读取的流序号查找映射表获取流名称,接下来调用MessageId反序列化方法读取MessageId信息,然后读取用户发送的消息,最后构建TupleImpl对象并返回。注意这里的流序号只用于消息的传输,其他地方看到的流序号实际上对应于这里的流名称。

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号