栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink基础教程pdf(flink 动态读取配置文件)

flink基础教程pdf(flink 动态读取配置文件)

文章目录

1.输出到File文件2.输出到Kafka中3.输出到Mysql中4.输出到Redis中5.输出到ElasticSearch中

1.输出到File文件
public class SinkTest01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(4);
        // 文件中读取数据
        DataStreamSource dataStreamSource = environment.readTextFile("E:\atguigu05\flink\flink02-java\src\main\resources\clicks.txt");

        StreamingFileSink sink = StreamingFileSink.
                forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .build()
                )
                .build();
        dataStreamSource.map(data -> data.toString()).addSink(sink);
        environment.execute();
    }
}

2.输出到Kafka中

kafka -> flink转换 -> kafka

        
            org.apache.flink
            flink-connector-kafka_${scala.binary.version}
            ${flink.version}
        
public class SinkKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");

        // kafka 输入
        DataStreamSource streamSource = environment.addSource(new FlinkKafkaConsumer("clicks", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator resultStream = streamSource.map(data -> {
            String[] words = data.split(",");
            return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2])).toString();
        });

        // Kafka 输出, 写入到Kafka, 新的topic, 输入的类型转换
        resultStream.addSink(new FlinkKafkaProducer("hadoop102:9092", "events", new SimpleStringSchema()));

        environment.execute();

    }
}



3.输出到Mysql中

maven

        
            org.apache.flink
            flink-connector-jdbc_${scala.binary.version}
            ${flink.version}
        
        
            mysql
            mysql-connector-java
            5.1.47
        
public class SinkTestMysql {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setParallelism(1);

        DataStreamSource streamSource = environment.fromElements(
                new Event("1", "1", 1L),
                new Event("2", "2", 1L),
                new Event("3", "3", 1L)
        );

        streamSource.addSink(JdbcSink.sink(
                "insert into clicks values (?, ?)",
                ((statement, event) -> {
                    statement.setString(1, event.user);
                    statement.setString(2, event.url);
                }),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/test")
                .withDriverName("com.mysql.jdbc.Driver")
                .withUsername("root")
                .withPassword("root")
                .build()

        ));
        environment.execute();
    }
}

4.输出到Redis中

maven:

        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
        
public class SinkRedisTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        DataStreamSource dataStreamSource = environment.readTextFile("E:\atguigu05\flink\flink02-java\src\main\resources\clicks.txt");

        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .build();

        dataStreamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));

        environment.execute();
    }
    public static class MyRedisMapper implements RedisMapper{

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }

        @Override
        public String getKeyFromData(String s) {
            return s;
        }

        @Override
        public String getValueFromData(String s) {
            return s;
        }
    }
}

5.输出到ElasticSearch中

maven:

        
            org.apache.flink
            flink-connector-elasticsearch6_${scala.binary.version}
            ${flink.version}
        
public class SinkElasticSearchTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        DataStreamSource streamSource = environment.fromElements(
                new Event("1", "1", 1L),
                new Event("2", "2", 1L),
                new Event("3", "3", 1L)
        );

        ArrayList list = new ArrayList<>();
        list.add(new HttpHost("hadoop102", 9200));
        ElasticsearchSinkFunction sinkFunction = new ElasticsearchSinkFunction() {
            @Override
            public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                HashMap map = new HashMap<>();

                // 构建Request
                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        .type("type")
                        .source(map);
                requestIndexer.add(request);
            }
        };
        streamSource.addSink(new ElasticsearchSink.Builder<>(list, sinkFunction).build());
        environment.execute();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771665.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号