sink部分,需要自己定义的实现类有
动态topic实现类 DynKeyedSerializationSchema.java
import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; public class DynKeyedSerializationSchema implements KeyedSerializationSchema{ private static final String topic_pre = "device_sink_"; private final SerializationSchema serializationSchema; public DynKeyedSerializationSchema(SerializationSchema serializationSchema) { this.serializationSchema = serializationSchema; } @Override public byte[] serializeKey(FanData fanData) { return new byte[0]; } @Override public byte[] serializeValue(Device item) { return this.serializationSchema.serialize(item); } @Override public String getTargetTopic(Device item) {//自己定义规则-这里对id取hash值后除100求余再求绝对值 String dynTopic = new StringBuffer().append(topic_pre).append(Math.abs(item.getDevId().hashCode()%100)).toString(); return dynTopic; } }
T类型序列化 DynSerializationSchema.java
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; public class ObjSerializationSchema implements KafkaSerializationSchema{ private String topic; private ObjectMapper mapper; public ObjSerializationSchema(String topic) { super(); this.topic = topic; } @Override public ProducerRecord serialize(Device item, Long timestamp) { byte[] b = null; if (mapper == null) { mapper = new ObjectMapper(); } try { b= mapper.writeValueAsBytes(item); } catch (JsonProcessingException e) { // TODO } //topic动态设置不在这里,这里只起到序列化作用 return new ProducerRecord (topic, b); } }
Flink流计算完成后sink部分操作addSink函数
由于操作数据是一个T泛型数据,在设置一些properties属性时,需要注意一下:
key/value.serializer 值:org.apache.kafka.common.serialization.ByteArraySerializer
在一些demo中通常为String类型(org.apache.kafka.common.serialization.StringSerializer),这里没有将T转换为JSON串。
.addSink(new FlinkKafkaProducer(topic,new DynKeyedSerializationSchema(new SerializationSchema () { @Override public byte[] serialize(Device item) { return new DynSerializationSchema(topic).serialize(item,System.currentTimeMillis()).value(); } }), getProperties())).setParallelism(parallelism_value); private Properties getProperties() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); properties.setProperty("zookeeper.connect", zookeeperAddress); properties.setProperty("group.id", "fan_consumer_group"); properties.setProperty("enable-auto-commit", "true"); properties.setProperty("auto-offset-reset", "earliest"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return properties; }
如何动态设定topic分区还需后续完善,这里未调用分区参数的方法,默认只有一个0号分区



