栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink之Redis的安装及RedisSink的用法

Flink之Redis的安装及RedisSink的用法

maven:


     org.apache.bahir
     flink-connector-redis_2.11
     1.0

redis是key-value的形式存储。

redis的安装:

编译安装redis到指定的目录下面
下载地址:http://download.redis.io/releases/
1、tar -zxvf redis-3.2.8.tar.gz -C /usr/local/download/
2、安装gcc支持
yum install -y gcc
3、cd /usr/local/download/redis-3.2.8
make PREFIX=/usr/local/software/redis-3.2.8 install
4、创建软连接
ln -s /usr/local/software/redis-3.2.8  /usr/local/software/redis
5、配置环境变量
编辑/etc/profile
最后一行
export REDIS_HOME=/usr/local/software/redis
export PATH=$PATH:$REDIS_HOME/bin
6、让环境变量生效
source /etc/profile

启动reids服务:
cd  /usr/local/software/redis-3.2.8    redis-server &

查看端口号:
cd  /usr/local/software/redis-3.2.8    netstat -anop |grep 6379

启动cli连接程序端
redis-cli -h localhost -p 6379

使用set name huitao

Flink里面使用redis:

package Flink_API;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigbase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.Properties;
public class TestRedis {

        //主要介绍Flink里面Redis的用法
        public static void main(String[] args) throws Exception {
            //创建运行环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            //Flink是以数据自带的时间戳字段为准
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //设置并行度
            env.setParallelism(1);

            Properties consumerProperties = new Properties();
            consumerProperties.setProperty("bootstrap.severs","page01:9001");
            consumerProperties.setProperty("grop.id","browsegroup");

            DataStreamSource dataStreamSource=env.addSource(new FlinkKafkaConsumer010("browse_topic", (KeyedDeserializationSchema) new SimpleStringSchema(),consumerProperties));

            DataStream processData=dataStreamSource.process(new ProcessFunction() {
                @Override
                public void processElement(String s, Context context, Collector collector) throws Exception {
                    try{
                        UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);
                        if(browseLog !=null){
                            collector.collect(browseLog);
                        }
                    }catch(Exception e){
                        System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());
                    }
                }
            });

            //每个用户浏览商品最大记录
            DataStream maxData=processData.keyBy("userID").maxBy("productPrice");
            maxData.print();

            //配置redis
            FlinkJedisConfigbase conf=new FlinkJedisPoolConfig.Builder().setHost("192.168.208.200").setPort(6379).build();
            maxData.addSink(new RedisSink<>(conf,new MyRedisMapper()));
            //程序的入口类
            env.execute("TestRedis");

        }

    public static class MyRedisMapper implements RedisMapper {
        
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.SET);
        }

        
        @Override
        public String getKeyFromData(UserBrowseLog userBrowseLog) {
            return userBrowseLog.getUserID();
        }

        
        @Override
        public String getValueFromData(UserBrowseLog userBrowseLog) {
            return String.valueOf(userBrowseLog.getProductPrice());
        }

    }
    //浏览类
        public static class UserBrowseLog implements Serializable {
            private String userID;
            private String eventTime;
            private String eventType;
            private String productID;
            private Integer productPrice;

            public String getUserID() {
                return userID;
            }

            public void setUserID(String userID) {
                this.userID = userID;
            }

            public String getEventTime() {
                return eventTime;
            }

            public void setEventTime(String eventTime) {
                this.eventTime = eventTime;
            }

            public String getEventType() {
                return eventType;
            }

            public void setEventType(String eventType) {
                this.eventType = eventType;
            }

            public String getProductID() {
                return productID;
            }

            public void setProductID(String productID) {
                this.productID = productID;
            }

            public Integer getProductPrice() {
                return productPrice;
            }

            public void setProductPrice(Integer productPrice) {
                this.productPrice = productPrice;
            }

            @Override
            public String toString() {
                return "UserBrowseLog{" +
                        "userID='" + userID + ''' +
                        ", eventTime='" + eventTime + ''' +
                        ", eventType='" + eventType + ''' +
                        ", productID='" + productID + ''' +
                        ", productPrice=" + productPrice +
                        '}';
            }
        }


    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761115.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号