栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

[大数据技术与应用省赛学习记录十]——模块四:数据采集与实施计算(使用Flink、Scala)

[大数据技术与应用省赛学习记录十]——模块四:数据采集与实施计算(使用Flink、Scala)

要求:使用Flink、Scala消费Kafka中的数据将其进行聚合计算出商城在线人数,将结果存入Redis中。
1.编写Scala文件

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object intoRedis {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val pro = new Properties()
    pro.setProperty("bootstrap.servers", "master:9092")
    pro.setProperty("group.id", "test1")
    val stream = env.addSource(new FlinkKafkaConsumer011[String]("test1", new SimpleStringSchema(), pro))
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")).map(x => {
      (x, 1)
    }).keyBy(0).sum(1)

    val conf = new FlinkJedisPoolConfig.Builder().setHost("master").setPort(6379).setPassword("123456").build()
    val sink = new RedisSink[(String, Int)](conf, new MyRedisMapper)
    result.addSink(sink)
    result.print()
    env.execute("test1")
  }
}
class MyRedisMapper extends RedisMapper[(String, Int)]() {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET)//
  }
  override def getValueFromData(t: (String, Int)) = t._2.toString
  override def getKeyFromData(t: (String, Int)) = t._1
}

2.使用redis统计在线用户人数

[hadoop@master ~]$ cd /software/redis/src/
[hadoop@master src]$ ./redis-server

另起终端

[hadoop@master ~]$ cd /software/redis/src/
[hadoop@master src]$ ./redis-cli
127.0.0.1:6379> 

参照以下内容作出redis代码编写

每当一个用户上线时, 我们就执行 ZADD 命令, 将这个用户以及它的在线时间添加到指定的有序集合中:
ZADD “online_users”
通过使用 ZSCORE 命令检查指定的用户 ID 在有序集合中是否有相关联的分值, 我们可以知道该用户是否在线:
ZSCORE “online_users”
而通过执行 ZCARD 命令, 我们可以知道总共有多用户在线:
ZCARD “online_users”

127.0.0.1:6379> ZADD online_user 1 2021/11/15
(integer) 1
127.0.0.1:6379> ZSCORE online_user 1
(nil)
127.0.0.1:6379> ZCARD online_user
(integer) 1
127.0.0.1:6379> ZADD online_user 1 2021/11/16
(integer) 1
127.0.0.1:6379> ZSCORE online_user 1
(nil)
127.0.0.1:6379> ZCARD online_user
(integer) 2

更多方法可以参考这篇。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584953.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号