以下内容基于flink1.12
pom依赖
UTF-8 UTF-8 1.8 1.8 1.8 2.12 1.12.2 org.apache.flink flink-clients_2.12 ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.12 ${flink.version} org.apache.flink flink-connector-kafka_2.11 1.12.1 com.google.code.gson gson 2.8.5 org.slf4j slf4j-api 1.7.7 org.projectlombok lombok 1.18.8 mysql mysql-connector-java 5.1.49 com.ververica flink-connector-mysql-cdc 2.0.0
代码:
@Slf4j
public class ProcessFunction {
public static void main(String[] args) throws Exception {
// 声明环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
final Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "source");
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("source_topic", new SimpleStringSchema(), properties);
// source
SingleOutputStreamOperator source = env.addSource(flinkKafkaConsumer)
.name("source")
.uid("source");
// 设置watermark
// transform
// 1.不使用tuple结构 直接使用自定义实体类
SingleOutputStreamOperator reduce = process(source);
// 2.使用tuple结构直接计算
SingleOutputStreamOperator> tuple = processOfTuple(source);
tuple.print();
// sink
FlinkKafkaProducer stringFlinkKafkaProducer = new FlinkKafkaProducer<>(
"sink_topic",
new KafkaSerializationSchemaWrapper<>(
"sink_topic", null, false, new SimpleStringSchema()),
properties,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
reduce.map(
(MapFunction) count -> {
final Gson gson = new GsonBuilder().create();
return gson.toJson(count);
}).addSink(stringFlinkKafkaProducer);
tuple.map((MapFunction, String>) Tuple2::toString)
.returns(TypeInformation.of(String.class)).addSink(stringFlinkKafkaProducer);
// 提交执行
env.execute("flink-test");
}
public static SingleOutputStreamOperator> processOfTuple(SingleOutputStreamOperator source){
SingleOutputStreamOperator> sum = source.flatMap((String line, Collector> collector) -> {
String[] tokens = line.split(",",-1);
// 输出结果 (word, 1)
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
return sum;
}
public static SingleOutputStreamOperator process(SingleOutputStreamOperator source) {
SingleOutputStreamOperator objectSingleOutputStreamOperator = source.flatMap((FlatMapFunction) (s, collector) -> {
String[] split = s.split(",", -1);
for (String s1 : split) {
collector.collect(new Count(s1, 1));
}
}).returns(TypeInformation.of(Count.class));
SingleOutputStreamOperator reduce = objectSingleOutputStreamOperator.keyBy(Count::getWord).reduce(new ReduceFunction() {
@Override
public Count reduce(Count count, Count t1) throws Exception {
return new Count(count.getWord(), count.getCount() + t1.getCount());
}
});
return reduce;
}
public static class Count implements Serializable {
String word;
int count;
public Count(String word, int count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
@Override
public String toString() {
return "{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Count count1 = (Count) o;
return count == count1.count && Objects.equals(word, count1.word);
}
@Override
public int hashCode() {
return Objects.hash(word, count);
}
}
public static final DateTimeFormatter STANDARD_PATTERN_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static long getWaterMark(String message) {
long eventTime;
try {
final Gson gson = new GsonBuilder().create();
final JsonObject json = gson.fromJson(message, JsonObject.class);
final JsonElement eventTimeElement = json.get("eventTime");
if (Objects.isNull(eventTimeElement) || eventTimeElement.isJsonNull()) {
log.warn("input message {} do not contain `eventTime`," +
" return `System.currentTimeMillis()` as eventTime", message);
eventTime = System.currentTimeMillis();
} else {
final String eventTimeStr = eventTimeElement.getAsString();
eventTime = LocalDateTime.parse(eventTimeStr, STANDARD_PATTERN_FORMATTER)
.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
}
} catch (Exception exp) {
log.error("extract `eventTime` from message {} fails," +
" return `System.currentTimeMillis()` as eventTime", message, exp);
eventTime = System.currentTimeMillis();
}
return eventTime;
}
}
附上本地安装kafka步骤(自行测试)
https://blog.csdn.net/sanhongbo/article/details/123574606



