如果您只是将String发送给Kafka:
public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{ private String topic; public ProducerStringSerializationSchema(String topic) { super(); this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) { return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8)); }}发送Java对象:
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import org.apache.kafka.clients.producer.ProducerRecord; public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{ private String topic;private ObjectMapper mapper; public ObjSerializationSchema(String topic) { super(); this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) { byte[] b = null; if (mapper == null) { mapper = new ObjectMapper(); } try { b= mapper.writevalueAsBytes(obj); } catch (JsonProcessingException e) { // TODO } return new ProducerRecord<byte[], byte[]>(topic, b); } }在你的代码中
.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));



