要求:使用Flink、Scala消费Kafka中的数据将其进行聚合计算出商城在线人数,将结果存入Redis中。
1.编写Scala文件
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object intoRedis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val pro = new Properties()
pro.setProperty("bootstrap.servers", "master:9092")
pro.setProperty("group.id", "test1")
val stream = env.addSource(new FlinkKafkaConsumer011[String]("test1", new SimpleStringSchema(), pro))
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")).map(x => {
(x, 1)
}).keyBy(0).sum(1)
val conf = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("123456").build()
val sink = new RedisSink[(String, Int)](conf, new MyRedisMapper)
result.addSink(sink)
result.print()
env.execute("test1")
}
}
class MyRedisMapper extends RedisMapper[(String, Int)]() {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.SET)//
}
override def getValueFromData(t: (String, Int)) = t._2.toString
override def getKeyFromData(t: (String, Int)) = t._1
}
2.使用redis统计在线用户人数
[hadoop@master ~]$ cd /software/redis/src/ [hadoop@master src]$ ./redis-server
另起终端
[hadoop@master ~]$ cd /software/redis/src/ [hadoop@master src]$ ./redis-cli 127.0.0.1:6379>
参照以下内容作出redis代码编写
每当一个用户上线时, 我们就执行 ZADD 命令, 将这个用户以及它的在线时间添加到指定的有序集合中:
ZADD “online_users”
通过使用 ZSCORE 命令检查指定的用户 ID 在有序集合中是否有相关联的分值, 我们可以知道该用户是否在线:
ZSCORE “online_users”
而通过执行 ZCARD 命令, 我们可以知道总共有多用户在线:
ZCARD “online_users”
127.0.0.1:6379> ZADD online_user 1 2021/11/15 (integer) 1 127.0.0.1:6379> ZSCORE online_user 1 (nil) 127.0.0.1:6379> ZCARD online_user (integer) 1 127.0.0.1:6379> ZADD online_user 1 2021/11/16 (integer) 1 127.0.0.1:6379> ZSCORE online_user 1 (nil) 127.0.0.1:6379> ZCARD online_user (integer) 2
更多方法可以参考这篇。


![[大数据技术与应用省赛学习记录十]——模块四:数据采集与实施计算(使用Flink、Scala) [大数据技术与应用省赛学习记录十]——模块四:数据采集与实施计算(使用Flink、Scala)](http://www.mshxw.com/aiimages/31/584953.png)
