jedis api中文文档_CYY941027的博客-CSDN博客_jedis文档
背景:每天批量同步800W左右数据到redis,需覆盖之前的数据
maven 引用:
com.redislabs spark-redis2.3.1-RC1 redis.clients jedis3.1.0
方式一: 大数据量导致redis集群cpu100%
case class Weather(adcode: Int, weather: String, temp:Int, humidity:Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("SparkReadRedis")
.master("local[*]")
.config("spark.redis.host", 连接地址)
.config("spark.redis.port", 端口,默认6379)
.config("spark.redis.auth", 密码) //指定redis密码
.config("spark.redis.db", 库名) //指定redis库,库名1
.getOrCreate()
val weatherSeq = Seq(Weather(111100, "雨",99,80), Weather(111101, "晴",-5,80))
val data = spark.createDataframe(weatherSeq)
data.write
.format("org.apache.spark.sql.redis")
.option("table", "weather")
.option("key.column", "aa")
.mode(SaveMode.Overwrite)
.save()
val loadedDf = spark.read
.format("org.apache.spark.sql.redis")
.option("table", "weather")
.option("key.column", "aa")
.load()
loadedDf.show(false)
}
方式二
可以使用 foreachPartition 设置 每千条数据塞到pipeline,然后提交一次,然后Thread.sleep(1000) 等待一秒钟,可以大幅度降低redis集群的cpu
val data = new util.HashMap[String, String]()
val jedis = new Jedis(redis_host, redis_port.toInt)
jedis.auth(redis_auth) //密码
jedis.select(1) //库
val pipeline = jedis.pipelined()
data.put("grade", "aa") //map塞数据
pipeline.hmset(key, data) //管道塞数据
pipeline.expire(key, 3600 * 50) //设置过期时间
pipeline.sync() //同步
pipeline.close()
jedis.close()
自此未再深入研究



