栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark读取hive数据写入redis

spark读取hive数据写入redis

import java.util

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SparkSession}
import redis.clients.jedis.{Jedis, JedisPool}
import redis.clients.util.Pool

import scala.collection.mutable.ArrayBuffer


object GoodBookSpeakWellDateToRedis {

  private[this] var jedisPool: Pool[Jedis] = _

  
  def init(host: String, port: Int, timeout: Int, password: String): Unit = {
    jedisPool = new JedisPool(new GenericObjectPoolConfig, host, port, timeout, password)
  }

  
  def zadd(key: String, story_14d_play_cts: Int, story_id: String): Unit = {
    val jedis = jedisPool.getResource
    jedis.zadd(key,story_14d_play_cts,story_id)
    jedis.close()
  }

  
  def zrem(key: String,member :String): Unit ={
    val jedis = jedisPool.getResource
    jedis.zrem(key,member)
    jedis.close()
  }

  // 数据存入redis
  def getResultToRedis(spark: SparkSession): Unit = {

    val resultDataNew: Dataframe = spark.sql(
      """
        |select story_id
        |      ,cast(story_14d_play_cts as int) as story_14d_play_cts
        |from ads.ads_ks_log_story_play_hsjj_14h_sum_a_d
        |order by story_14d_play_cts desc
        |""".stripMargin)

    resultDataNew.show(20)

    val mapRDD: RDD[(String, Int)] = resultDataNew.rdd.map(row => (row.getString(0), row.getInt(1)))

    //今天的10条数据
    val nowDate: Array[(String,Int)] = mapRDD.collect()

    val jedis = jedisPool.getResource

    import scala.collection.JavaConversions._
    //昨天的redis中的10条数据
    val beforeDate: util.Set[String] = jedis.zrevrange("EXPLAIN_BOOK_ZS", 0, -1)
    jedis.close()

    //昨天和今天相同的album_id
    val sameDate = new ArrayBuffer[String]

    //今天相对于昨天不相同的album_id
    val nowNoSameDate = new ArrayBuffer[(String, Int)]

    //昨天相对于今天不相同的album_id
    val beforeNoSameDate: ArrayBuffer[String] = new ArrayBuffer[String]

    for(elem <- nowDate){
      if (beforeDate.contains(elem._1)){

        sameDate += elem._1
        zadd("EXPLAIN_BOOK_ZS",elem._2,elem._1)

      } else{

        nowNoSameDate += elem
      }
    }

    if(sameDate.length >0 & sameDate.length < 10) {
      for (elem <- beforeDate) {
        if (!sameDate.contains(elem)) {
          beforeNoSameDate += elem
        }
      }
    }else if(sameDate.length == 0){
      for(elem <- beforeDate){
        beforeNoSameDate += elem
      }
    }

    var i:Int = 0
    if(sameDate.length!=10){
      //删除一个添加一个
      while (i<=nowNoSameDate.length-1){
        zrem("EXPLAIN_BOOK_ZS",beforeNoSameDate(i))
        zadd("EXPLAIN_BOOK_ZS",nowNoSameDate(i)._2,nowNoSameDate(i)._1)
        i+=1
      }
    }
  }


  def main(args: Array[String]): Unit = {


    val spark = SparkSession.builder
      .appName("GoodBookSpeakWellDateToRedis")
           // .master("local[*]")
      .config("spark.driver.allowMultipleContexts", true)
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val password = ""
    val host = ""
    val port = 6379
    val timeout = 1000

    init(host, port, timeout, password)

    //数据存入redis
    getResultToRedis(spark)

    spark.stop()
  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745436.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号