2021SC@SDUSC
storm代码阅读(十二)2021SC@SDUSC
Task部分阅读(七)- storm代码阅读(十二)
- Storm中传输的消息及序列化
- 反序列化
下面将介绍Storm如何对消息进行序列化并分析实际的传输内容。
KryoTupleSerializer类用于实现消息的序列化。
代码如下:
public class KryoTupleSerializer implements ITupleSerializer {
private KryoValuesSerializer kryo;
private SerializationFactory.IdDictionary ids;
private Output kryoOut;
public KryoTupleSerializer(final Map conf, final GeneralTopologyContext context) {
kryo = new KryoValuesSerializer(conf);
kryoOut = new Output(2000, 2000000000);
ids = new SerializationFactory.IdDictionary(context.getRawTopology());
}
@Override
public byte[] serialize(Tuple tuple) {
try {
kryoOut.clear();
kryoOut.writeInt(tuple.getSourceTask(), true);
kryoOut.writeInt(ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
tuple.getMessageId().serialize(kryoOut);
kryo.serializeInto(tuple.getValues(), kryoOut);
return kryoOut.toBytes();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
从public byte[] serialize(Tuple tuple)中可以看出,Strom实际传输的消息为:
SourceTaskId:int; StreamId:int; messageId:MessageId; Tuplevalues: List< Object>
其中,流序号是对组件中所有的流排序后确定的统一编号,IdDictionary类会根据输入的Topology定义获取所有的组件名字以及它们的流编号,然后进行编号。Tuplevalues为用户实际发送的消息。
MessageId类存储从消息的RootId到其衍生消息的消息ID的异或值的映射关系,它用于跟踪消息,内含一个哈希表。
MessageId类中用于实现序列化的方法如下:
public void serialize(Output out) throws IOException {
out.writeInt(anchorsToIds.size(), true);
for (Entry anchorToId : anchorsToIds.entrySet()) {
out.writeLong(anchorToId.getKey());
out.writeLong(anchorToId.getValue());
}
}
序列化的数据为< 元素个数,[RootId,AckVal]+>,而反序列化方法与其类似。
反序列化KryoTupleDeserializer类用来对收到的消息字节数组进行反序列化。
代码如下:
public TupleImpl deserialize(byte[] ser) {
try {
kryoInput.setBuffer(ser);
int taskId = kryoInput.readInt(true);
int streamId = kryoInput.readInt(true);
String componentName = context.getComponentId(taskId);
String streamName = ids.getStreamName(componentName, streamId);
MessageId id = MessageId.deserialize(kryoInput);
List
反序列化的过程是:首先依次读取TaskId,并通过读取的流序号查找映射表获取流名称,接下来调用MessageId反序列化方法读取MessageId信息,然后读取用户发送的消息,最后构建TupleImpl对象并返回。注意这里的流序号只用于消息的传输,其他地方看到的流序号实际上对应于这里的流名称。



