栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 消费 kafka sink (KUDU,Mysql,Kafka)

Flink 消费 kafka sink (KUDU,Mysql,Kafka)

结合Flink一周的使用经验,用到了三个模块 (source于kafka,sink到Mysql,kudu,kafka) source kafka是最简单的
//导入的包
import scala.collection.JavaConverters._
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import java.util.Properties


//主要流程
val env = StreamExecutionEnvironment.getExecutionEnvironment
    val prop = new Properties()
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host:9092")
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafkagroup")
    prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    //消费最新数据
    prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val ds = env.addSource(
      new FlinkKafkaConsumer[String](
        "topic_name",
        new SimpleStringSchema(),
        prop
      )
    )
    //开始转换 我Kafka是json数据 所以引入fastjson进行解析
    //当不确定json内部是否有我们要的key值,我们可以使用filter + containsKey的方式进行过滤
	val dataStream = ds.map(x=>{val line = JSON.parseObject(x)
		//第一个map返回json
		line
  	}).filter(x => x.containsKey("report")).map(x=>{
  		val appid = x.get("appid").toString
  		val userid = x.get("userid").toString
		val time = x.get("time").toString
		//sink mysql 采用case class的方式
		Reading(appid,userid,time)
		//sink kafka 可直接字符串拼接
		appid +","+userid +","+time 
		//sink kudu 采用map的方式 自定义kudusink采用java语言编写 所以要将scala map转成java map
		val map = Map("appid" -> appid, "userid" -> userid, "time" -> time)
		map.asJava
  	})
  	case class Reading(appid: string, userid: string, time: string)
Sink MySQL
  class JDBCSink() extends RichSinkFunction[SensorReading] {
    // 定义sql连接、预编译器
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    // 初始化,创建连接和预编译语句
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      conn = DriverManager.getConnection("jdbc:mysql://IP:3306/库名", "用户名", "密码")
      insertStmt = conn.prepareStatement("INSERT INTO salary_table (appid, userid, create_time) VALUES (?,?,?)")
    }
    // 调用连接,执行sql
    override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
      insertStmt.setString(1, value.appid)
      insertStmt.setString(2, value.userid)
      insertStmt.setString(4, value.create_time)
      insertStmt.execute()
    }
    // 关闭时做清理工作
    override def close(): Unit = {
      insertStmt.close()
      conn.close()
    }
  }
	dataStream.addSink(new JDBCSink())
	//开始执行
    env.execute("job")
Sink KUDU
package org.example;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;

public class SinkKudu extends RichSinkFunction> {


    private KuduClient client;
    private KuduTable table;
    private String kuduMaster;
    private String tableName;
    private Schema schema;
    private KuduSession kuduSession;
    private ByteArrayOutputStream out;
    private ObjectOutputStream os;


    public SinkKudu(String kuduMaster, String tableName) {
        this.kuduMaster = kuduMaster;
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        out = new ByteArrayOutputStream();
        os = new ObjectOutputStream(out);
        client = new KuduClient.KuduClientBuilder(kuduMaster).build();
        table = client.openTable(tableName);
        schema = table.getSchema();
        kuduSession = client.newSession();
        kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
    }


    public void invoke(Map map) {
        if (map == null) {
            return;
        }
        try {
            int columnCount = schema.getColumnCount();
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            for (int i = 0; i < columnCount; i++) {
                String value = map.get(schema.getColumnByIndex(i).getName());
                insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
            }

            OperationResponse response = kuduSession.apply(insert);
            if (response != null) {
                System.out.println(response.getRowError().toString());
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    @Override
    public void close() throws Exception {
        try {
            kuduSession.close();
            client.close();
            os.close();
            out.close();
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    // 插入数据 自动判断kudu类型并转换 图方便只保留以下几种
    private void insertData(PartialRow row, Type type, String columnName, String value) throws IOException {
        try {
            switch (type) {
                case STRING:
                    row.addString(columnName, value);
                    return;
                case INT32:
                    row.addInt(columnName, Integer.valueOf(value));
                    return;
                case INT64:
                    row.addLong(columnName, Long.valueOf(value));
                    return;
                case DOUBLE:
                    row.addDouble(columnName, Double.valueOf(value));
                    return;
                case FLOAT:
                    row.addFloat(columnName, Float.valueOf(value));
                    return;
                default:
                    throw new UnsupportedOperationException("Unknown type " + type);
            }
        } catch (Exception e) {
            System.out.println("数据插入异常");
        }
    }
}

	// sink 到 kudu
    val kuduMaster = "ip";
    // 注意impala创建的kudu表 要加上 impala::
    val tableInfo = "tablename"

    //dataStream.addSink(new SinkKudu(kuduMaster,tableInfo))
    dataStream.addSink(new SinkKudu(kuduMaster,tableInfo))
    env.execute("flink_kudu_job")
Sink Kakfa 不需要自定义sink 原生支持
    val prop2 = new Properties()
    prop2.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.0.20.7:9092")
    prop2.setProperty(ProducerConfig.RETRIES_CONFIG,"0")
    prop2.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    prop2.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    //dwd_log new
    dataStream.addSink( new FlinkKafkaProducer[String](
      "ip:9092",
      "topic",
      new SimpleStringSchema()))
    env.execute("event_attendees_ff")
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354525.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号