栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink简单实现hello word

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink简单实现hello word

flink教程实现hello word!

flink的代码层面数据处理 流程
addSource(读取数据)->类型为dataStream做一些业务处理(核心部分在这里),好比java的stream的一些API操作->addSink(输出数据)

1.四种读取方式-从集合中、从kafka中、从文件中、自定义

上代码

pom.xml



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.6
         
    
    me
    flink
    0.0.1-SNAPSHOT
    flink
    Demo project for Spring Boot
    
        1.8
        8
        8
        1.12.1
        2.12
    
    
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-connector-kafka_${scala.binary.version}
            ${flink.version}
        
    

    
        
            
            
                org.apache.maven.plugins
                maven-surefire-plugin
                
                    true
                
            
            
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    



package me.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.Properties;




public class StreamWordCount {

    public static void main(String[] args) throws Exception {

        // 创建流处理执行环境
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
        // 设置并行度1
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.5.42:9092");
        // 下面这些次要参数
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        // flink添加外部数据源
        DataStream dataStream = env.addSource(new FlinkKafkaConsumer("topic2", new SimpleStringSchema(),properties));

        // 基于数据流进行转换计算 做一些业务操作
        DataStream flatMapStream = dataStream.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] fields = value.split(",");
                for(String field:fields){
                // 收集器输出
                    out.collect(field);
                }
            }
        });

        flatMapStream.print();
        // 将数据写入Kafka
        flatMapStream.addSink( new FlinkKafkaProducer("192.168.5.42:9092", "topic1", new SimpleStringSchema()));

        // 执行任务
        env.execute();
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/820159.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号