override protected def process(df: Dataframe, param: Map[String, Any]): Dataframe = {
val (redisConfig, keyNameInDF, valueNameInDF, keyPrefix, expiredTime,productName,batchSize) = parseParam(param)
df.mapPartitions ( partition => {
val wrapper = JedisClient.getInstance(redisConfig).asInstanceOf[JedisExpandWrapper]
def forPartition(pipeline:Pipeline,jedis: Jedis): Iterator[Row] ={
val rows: Iterator[Row] = partition.map(row => {
val key = row.getAs[String](keyNameInDF)
val value = row.getAs[String](valueNameInDF) match {
case x: String => x
case null => ""
}
pipeline.set(s"${productName}:${keyPrefix}:${key}", value, SetParams.setParams().ex(expiredTime).nx())
row
})
rows
}
wrapper.operate({ jedis: Jedis => {
val pipeline: Pipeline = jedis.pipelined()
val resultRows = forPartition(pipeline,jedis)
pipeline.sync()
resultRows
}}, "")
})(RowEncoder(df.schema))
}
读取文件数据(3.5亿),写入redis,redis写入数据量与文件的记录数差好多,是什么问题导致的?



