栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink写入es(开启x-pack认证)

Flink写入es(开启x-pack认证)

package kafka2flink2es

import java.text.SimpleDateFormat
import java.util.{Properties, UUID}

import caseclass.CaseClass.oLogRunning
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.flink.api.common.functions.{FlatMapFunction, RuntimeContext}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

object OLogRunning {
  def main(args: Array[String]): Unit = {
    //TODO 准备连接kafka环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val properties = new Properties()
        properties.setProperty("bootstrap.servers","192.168.193.150:9092")
        properties.setProperty("group.id","consumer-group")

        val sourceData: DataStream[String] = env
          .addSource(new FlinkKafkaConsumer[String]("o-log-running",new SimpleStringSchema(),properties))


    //数据加工处理,放入样例类对象中
    val data: DataStream[oLogRunning] = sourceData.flatMap(new FlatMapFunction[String, oLogRunning] {
      override def flatMap(in: String, out: util.Collector[oLogRunning]): Unit = {
        val jsonArray: JSonArray = JSON.parseArray(in)
        for (i <- 0 until jsonArray.size()) {
          val jsonObject: JSonObject = jsonArray.getJSONObject(i)
          val timestamp = jsonObject.getString("timestamp")
          val serviceName = jsonObject.getString("serviceName")
          val version = jsonObject.getString("version")
          val nodeIp = jsonObject.getString("nodeIp")
          val logLevel = jsonObject.getString("logLevel")
          val traceId = jsonObject.getString("traceId")
          val message = jsonObject.getString("message")
          val path = jsonObject.getString("path")
          val operation = oLogRunning(timestamp,serviceName,version,nodeIp
            ,logLevel,traceId,message,path)
          out.collect(operation)
        }
      }
    })

    data.print()


    //定义httpHosts
    import java.util.ArrayList
    val httpHosts = new ArrayList[HttpHost]
    httpHosts.add(new HttpHost("192.168.193.150",9200))


    //自定义写入es的EsSinkFunction
    val myEsSinkFunc = new ElasticsearchSinkFunction[oLogRunning] {
      override def process(t: oLogRunning, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        import java.util.HashMap
        val datamap = new HashMap[String, String]()
        val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        datamap.put("logType","07")
        datamap.put("status","01")
        datamap.put("time",df.format(System.currentTimeMillis()))
        datamap.put("timestamp",t.timestamp)
        datamap.put("serviceName",t.serviceName)
        datamap.put("version",t.version)
        datamap.put("nodeIp",t.nodeIp)
        datamap.put("logLevel",t.logLevel)
        datamap.put("traceId",t.traceId)
        datamap.put("message",t.message.toString)
        datamap.put("path",t.path)

        //创建index request,用于发送http请求
        val indexRequest = Requests.indexRequest()
          .index("o-log-running")
          .`type`("readingdata")
          .source(datamap)


        //用indexer发送请求
        requestIndexer.add(indexRequest)
      }
    }

    val esSinkBulider = new ElasticsearchSink
    .Builder[oLogRunning](httpHosts,myEsSinkFunc)

    //每来一条数据就处理一条
    esSinkBulider.setBulkFlushMaxActions(1)

    data.addSink(esSinkBulider.build())

  env.execute()
  }
}

以上代码不带es用户名密码认证。

Flink写入es(带x-pack认证)
关键代码

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/679774.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号