栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

structured streaming 入门级初使用(一)

structured streaming 入门级初使用(一)

1、准备

导入依赖

		
            org.scala-lang
            scala-library
            2.11.0
        
        
            org.apache.spark
            spark-streaming_2.11
            2.3.2
        
        
            org.apache.spark
            spark-streaming-flume_2.11
            2.3.2
        

        
            com.alibaba
            fastjson
            1.2.76
        

        
            org.apache.spark
            spark-core_2.11
            2.3.2
        
        
        
            org.apache.spark
            spark-streaming-kafka-0-10_2.11
            2.3.2
        
        
        
            org.apache.spark
            spark-sql_2.11
            2.3.2
        
	     
            org.apache.spark
            spark-sql-kafka-0-10_2.11
            2.3.2
        
        
        
            redis.clients
            jedis
            2.9.0
        

        
            org.apache.commons
            commons-lang3
            3.3.2
        
        
        
            com.alibaba
            fastjson
            1.2.76
        
        
     
            org.apache.kafka
            kafka_2.11
            2.3.0
        
        
        
            org.apache.kafka
            kafka-clients
            2.1.0
        
2、从kafka读取数据

注意:

1、structured streaming 读取kafka数据是不需要设置group id的。
2、df.selectExpr()中还可以选择kafka的key等

代码如下

    val conf = new SparkConf().setMaster("local[*]")
    val spark =  SparkSession
      .builder()
      .config(conf)
      .appName(getClass.getName)
      .getOrCreate()
 import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("subscribe", "cache_tmp")//
      .option("startingOffsets", "earliest")
      .load()
    val re = df.selectExpr("CAST(value AS STRING) ")//
      .as[(String)]
3、写入kafka

注意:

1、写入kafka需要做checkpoint(),checkpoint 中保存着偏移量
2、目的 kafka 即使不存在 也会自己创建。

代码如下

   val query = re

      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers",PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("topic","tmp_t")
      .option("checkpointLocation","E:/t_check_2")//生产环境中要放在hdfs哟。
      .start()
    query.awaitTermination()

这里的checkpoint的目录保存这消费的偏移量等等信息。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/632695.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号