栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark写入使用pipeline批量写redis

spark写入使用pipeline批量写redis

  override protected def process(df: Dataframe, param: Map[String, Any]): Dataframe = {
    val (redisConfig, keyNameInDF, valueNameInDF, keyPrefix, expiredTime,productName,batchSize) = parseParam(param)
    df.mapPartitions ( partition => {
      val wrapper = JedisClient.getInstance(redisConfig).asInstanceOf[JedisExpandWrapper]

      def forPartition(pipeline:Pipeline,jedis: Jedis): Iterator[Row]  ={
        val rows: Iterator[Row] = partition.map(row => {

          val key = row.getAs[String](keyNameInDF)
          val value = row.getAs[String](valueNameInDF) match {
            case x: String => x
            case null => ""
          }
          pipeline.set(s"${productName}:${keyPrefix}:${key}", value, SetParams.setParams().ex(expiredTime).nx())
          row
        })
        rows
      }

      wrapper.operate({ jedis: Jedis => {
        val pipeline: Pipeline = jedis.pipelined()
        val resultRows = forPartition(pipeline,jedis)

        pipeline.sync()

        resultRows
      }}, "")
    })(RowEncoder(df.schema))
  }
      

读取文件数据(3.5亿),写入redis,redis写入数据量与文件的记录数差好多,是什么问题导致的?

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278451.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号