栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink——读取Kafka数据处理后存入StarRocks

Flink——读取Kafka数据处理后存入StarRocks

授权方式登录

    登录华为FusionInsight Manager下载flinkuser用户的授权文件user.keytab、krb5.conf准备LoginUtil工具类登录代码如下:
//login
String userPrincipal = "flinkuser";
String userKeytabPath = System.getProperty("user.dir") + File.separator
        + "conf"+File.separator+"user.keytab";
String krb5ConfPath = System.getProperty("user.dir") + File.separator
        + "conf"+File.separator+"krb5.conf";
LoginUtil.setJaasFile(userPrincipal, userKeytabPath);

Configuration configuration = new Configuration();
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, configuration);

Flink上下文环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

连接Kafka,读取数据

Properties properties = new Properties();
properties.setProperty("topic", "petertest1");
properties.setProperty("bootstrap.servers", "ip:port,ip:port,ip:port,ip:port,ip:port");

DataStream messageStream = env.addSource(new FlinkKafkaConsumer010<>(properties.getProperty("topic"),
        new SimpleStringSchema(),properties));

处理数据

//对数据做ETL,将数据封装成Bean
SingleOutputStreamOperator dataStreamSource = messageStream.map(new MapFunction() {
  @Override
  public TableData map(String value) throws Exception {
    System.out.println("receive kafka data:" + value);
    int siteId = random.nextInt(10000);
    int cityCode = random.nextInt(1000);
    return new TableData(siteId, cityCode, value, 1);
  }
});

连接StarRocks,写入数据

//将Bean类型的数据sink到starrocks
dataStreamSource.addSink(StarRocksSink.sink(
        // the table structure
        TableSchema.builder()
                .field("siteid", DataTypes.INT())
                .field("citycode", DataTypes.SMALLINT())
                .field("username", DataTypes.VARCHAr(32))
                .field("pv", DataTypes.BIGINT())
                .build(),
        // the sink options
        StarRocksSinkOptions.builder()
                .withProperty("connector", "starrocks")
                .withProperty("jdbc-url", "jdbc:mysql://ip:port?characterEncoding=utf-8&useSSL=false")
                .withProperty("load-url", "ip:port;ip:port;ip:port")
                .withProperty("username", "root")
                .withProperty("password", "")
                .withProperty("table-name", "table1")
                .withProperty("database-name", "example_db")
                //设置列分隔符
                .withProperty("sink.properties.column_separator", "\x01")
                //设置行分隔符
                .withProperty("sink.properties.row_delimiter", "\x02")
                //设置sink提交周期,这里设置10s提交一次
                .withProperty("sink.buffer-flush.interval-ms", "10000")
                .build(),
        // set the slots with streamRowData
        (slots, streamRowData) -> {
          slots[0] = streamRowData.getSiteid();
          slots[1] = streamRowData.getCitycode();
          slots[2] = streamRowData.getUsername();
          slots[3] = streamRowData.getPv();
        }
));

执行

env.execute();

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758759.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号