栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

旅游平台Flink实时项目

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

旅游平台Flink实时项目

目录

0 环境准备:

1 软件环境的搭建

1.1 jdk的安装

 1.2 scala的安装

 1.3 hadoop的安装 

 1.4 KAFKA的安装 

  1.5 elastocsearch的安装

1.6 redis的安装

 1.7 flink的安装

1.8 安装mysql

2 数据介绍

2.1 平台功能

2.2 数据的流程描述

2.3 数据组成

2.3.1 事实数据

2.3.1.1 旅游订单数据

 2.3.1.2 用户行为数据

2.3.1.3维度数据

3 造数

3.1 资源准备

3.1.0pom.xml

3.1.1 其他资源

3.2 开发


0 环境准备:

新建虚拟机->设置静态网络->修改映射配置

新建虚拟机 参考

面向大数据开发的集群之虚拟机搭建(一)_林柚晞的博客-CSDN博客

虚拟机网络+设置静态网+修改映射配置+免密登录 参考

项目0单节点的虚拟机做大数据开发(四万字全)_林柚晞的博客-CSDN博客

创建项目路径

mkdir -p /opt/apps 

mkdir -p /opt/software

mkdir -p /opt/scritps

apps 是安装软件路径的路径

software是把安装包上传到虚拟机的文件

scripts是存放一些可以自动化安装软件的脚本

1 软件环境的搭建

先说明一下:就是把安装包扔到指定的位置是使用远程工程工具的。这边省略了把安装包上传到服务器。

自动化安装脚本的总结

-设置安装路径和安装包的位置,配置环境变量的位置

-判断安装路径的前驱路径是否存在

-cat追加文件路径到文件里面(比如修改环境变量,覆盖配置文件)

-正式安装步骤,解压缩等

1.1 jdk的安装

安装包:

链接:https://pan.baidu.com/s/1R5LaDIasYh4vpY_RUB_Qhw?pwd=tff2
提取码:tff2
--来自百度网盘超级会员V2的分享

cd /opt/scripts/

vi install_jdk.sh

#!/bin/bash

# author : XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装jdk
# 约定 > 配置 > 编码

INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
JDK_TAR=/opt/software/jdk-8u45-linux-x64.tar.gz
ETC_PROFILE=/etc/profile

# 提示使用方法的函数
usage() {
    echo "请将jdk-8u45-linux-x64.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}

# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${JDK_TAR} ]; then
    usage    
    exit 0
fi

# 已经安装过了
if [ -e  ${JAVA_DIR} ]; then
    echo "${JAVA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
    exit 0
fi

# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
    mkdir -p ${INSTALL_PREFIX}
    echo "初始化目录:${INSTALL_PREFIX}"
fi

if [ ! -e ${JAVA_DIR} ]; then
    mkdir -p ${JAVA_DIR}
    echo "初始化目录:${JAVA_DIR}"
fi

## 解压JDK的tar包
tar -zxvf ${JDK_TAR} -C ${INSTALL_PREFIX}

## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export JAVA_HOME=${JAVA_DIR}
export CLASS_PATH=.:${JAVA_DIR}/lib/dt.jar:${JAVA_DIR}/lib/tool.jar
export PATH=$PATH:${JAVA_DIR}/bin
EOF

## 提示成功
echo "install jdk successful!!!"

 chmod +x ./install_jdk.sh

./install_jdk.sh

source /etc/profile

java -version

 1.2 scala的安装

安装包:

链接:https://pan.baidu.com/s/1oBbplZTIem1K0g4Wv_PD5Q?pwd=dzcb
提取码:dzcb
--来自百度网盘超级会员V2的分享

vi /opt/scripts/install_scala.sh

 #!/bin/bash

# author :  XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装scala
# 约定 > 配置 > 编码

INSTALL_PREFIX=/opt/apps
SCALA_DIR=${INSTALL_PREFIX}/scala-2.11.8
SCALA_TAR=/opt/software/scala-2.11.8.tgz
ETC_PROFILE=/etc/profile

# 提示使用方法的函数
usage() {
    echo "请将scala-2.11.8.tgz上传到/opt/software处然后再执行此脚本!!!"
}

# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${SCALA_TAR} ]; then
    usage    
    exit 0
fi

# 已经安装过了
if [ -e  ${SCALA_DIR} ]; then
    echo "${SCALA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
    exit 0
fi

# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
    mkdir -p ${INSTALL_PREFIX}
    echo "初始化目录:${INSTALL_PREFIX}"
fi

if [ ! -e ${SCALA_DIR} ]; then
    mkdir -p ${SCALA_DIR}
    echo "初始化目录:${SCALA_DIR}"
fi

## 解压JDK的tar包
tar -zxvf ${SCALA_TAR} -C ${INSTALL_PREFIX}

## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export SCALA_HOME=${SCALA_DIR}
export PATH=$PATH:${SCALA_DIR}/bin
EOF

## 提示成功
echo "install SCALA successful!!!"

 chmod +x ./install_scala.sh

./install_scala.sh

source /etc/profile

scala -version

 1.3 hadoop的安装 

安装包:

链接:https://pan.baidu.com/s/1O-E4B1MUZjA90rsvEi1nwQ?pwd=q0ey
提取码:q0ey
--来自百度网盘超级会员V2的分享

vi /opt/scripts/install_hadoop.sh

##1. install_hadoop.sh
#!/bin/bash

# author : lixi
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装hadoop
# 约定 > 配置 > 编码

INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
HADOOP_DIR=${INSTALL_PREFIX}/hadoop-2.8.1
HADOOP_TAR=/opt/software/hadoop-2.8.1.tar.gz
ETC_PROFILE=/etc/profile

# 提示使用方法的函数
usage() {
    echo "请将hadoop-2.8.1.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}

# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${HADOOP_TAR} ]; then
    usage    
    exit 0
fi

# 已经安装过了
if [ -e  ${HADOOP_DIR} ]; then
    echo "${HADOOP_DIR}路径已经存在,Hadoop已经安装过了,无需再次安装!!!"
    exit 0
fi

# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
    mkdir -p ${INSTALL_PREFIX}
    echo "初始化目录:${INSTALL_PREFIX}"
fi

if [ ! -e ${HADOOP_DIR} ]; then
    mkdir -p ${HADOOP_DIR}
    echo "初始化目录:${HADOOP_DIR}"
fi

## 解压JDK的tar包
tar -zxvf ${HADOOP_TAR} -C ${INSTALL_PREFIX}

## 配置Hadoop
## hadoop-env.sh

## core-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/core-site.xml


    
        fs.defaultFS
        hdfs://192.168.10.101:9000
    

    
        hadoop.tmp.dir
        ${HADOOP_DIR}/hdpdata
    

    
        hadoop.proxyuser.root.hosts
        *
    

    
        hadoop.proxyuser.root.groups
        *
    


EOF

## hdfs-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/hdfs-site.xml


    
        fs.replication
        1
    

    
        dfs.http.address
        qianfeng01:50070
    

    
        dfs.secondary.http.address
        qianfeng01:50090
    

    
        dfs.namenode.name.dir
        ${HADOOP_DIR}/hdpdata/dfs/name
    

    
        dfs.datanode.data.dir
        ${HADOOP_DIR}/hdpdata/dfs/data
    

    
        dfs.checkpoint.dir
        ${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
    

    
        dfs.checkpoint.edits.dir
        ${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
    

    
        dfs.permissions
        false
    


EOF
## yarn-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/yarn-site.xml


    
        yarn.nodemanager.aux-services
        mapreduce_shuffle
    

    
        yarn.resourcemanager.hostname
        qianfeng01
    

    
        yarn.resourcemanager.address
        qianfeng01:8032
    

    
        yarn.resourcemanager.scheduler.address
        qianfeng01:8030
    


EOF
## mapred-site.xml
mv ${HADOOP_DIR}/etc/hadoop/mapred-site.xml.template ${HADOOP_DIR}/etc/hadoop/mapred-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/mapred-site.xml

   
        mapreduce.framework.name
        yarn
    

    
        mapreduce.jobhistory.address
        qianfeng01:10020
    

    
        mapreduce.jobhistory.webapp.address
        qianfeng01:19888
    


EOF

## slaves
cat << EOF > ${HADOOP_DIR}/etc/hadoop/slaves
qianfeng01
EOF

## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export HADOOP_HOME=${HADOOP_DIR}
export PATH=$PATH:${HADOOP_DIR}/bin:${HADOOP_DIR}/sbin
EOF

## 格式化
${HADOOP_DIR}/bin/hdfs namenode -format

## 提示成功
echo "install hadoop successful!!!"

chmod +x ./install_hadoop.sh

./install_hadoop.sh

source /etc/profile

hadoop version

cd /opt/apps/hadoop-2.8.1/etc/hadoop/hadoop-env.sh

就是找到JAVA_HOME

JAVA_HOME=/opt/apps/jdk1.8.0_45

start-all.sh

 1.4 KAFKA的安装 

安装包:链接:https://pan.baidu.com/s/1abH6Nn4mD9MR7v5r_dJvBg?pwd=qfz8
提取码:qfz8
--来自百度网盘超级会员V2的分享

 tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/

vi /etc/profile

export KAFKA_HOME=/opt/apps/kafka_2.11-1.1.1

export PATH=$PATH:$KAFKA_HOME/bin

vi /opt/apps/kafka_2.11-1.1.1/config/server.properties

下面是需要修改的地方

broker.id=1

advertised.listeners=PLAINTEXT://192.168.10.101:9092

log.dirs=/opt/apps/kafka_2.11-1.1.1/log/kafka-logs

zookeeper.connect=qianfeng01:2181/kafka

 vi /opt/apps/kafka_2.11-1.1.1/config/producer.properties

 bootstrap.servers=qianfeng01:9092

 vi /opt/apps/kafka_2.11-1.1.1/config/consumer.properties 

 bootstrap.servers=qianfeng01:9092

  vi /opt/apps/kafka_2.11-1.1.1/config/zookeeper.properties

 dataDir=/opt/apps/kafka_2.11-1.1.1/zkData

 [root@hadoop kafka_2.11-1.1.1]# zookeeper-server-start.sh -daemon config/zookeeper.properties

zookeeper-shell.sh qianfeng01:2181

[root@hadoop kafka_2.11-1.1.1]# kafka-server-start.sh -daemon config/server.properties

jps

 kafka-topics.sh --create --zookeeper qianfeng01:2181/kafka --replication-factor 1 --partitions 2 --topic test 

 kafka-topics.sh --list --zookeeper qianfeng01:2181/kafka

  1.5 elastocsearch的安装

安装包:

链接:https://pan.baidu.com/s/1h3C3kneHlLuZxXg8KKzK2g?pwd=y35e
提取码:y35e
--来自百度网盘超级会员V2的分享

 useradd hadoop

 passwd hadoop

使用root账号

 [root@hadoop software]# tar -zxvf elasticsearch-6.5.3.tar.gz -C /opt/apps/

[root@hadoop elasticsearch-6.5.3]# vi /etc/profile
export ELASTICSEARCH_HOME=/opt/apps/elasticsearch-6.5.3
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin

配置

cd /opt/apps/elasticsearch-6.5.3/config/elasticsearch.yml

就是我电脑主机号是qianfeng01(其实我也很想改,但是电脑映射习惯了,改了映射没用了。为了妥协只能一直用了) 涉及到qianfeng01大家就改成自己的主机号哦

cluster.name: qianfeng01

node.name: qianfeng01
node.master: true
node.data: true

path.data: /opt/apps/elasticsearch-6.5.3/data

path.logs: /opt/apps/elasticsearch-6.5.3/logs

network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["qianfeng01"]

给普通用户授权

[root@hadoop apps]# ll
total 16
drwxr-xr-x  8 root root 4096 Dec  7  2018 elasticsearch-6.5.3
drwxrwxr-x 11  500  500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x  8   10  143 4096 Apr 11  2015 jdk1.8.0_45
drwxr-xr-x  9 root root 4096 Apr 24 14:32 kafka_2.11-1.1.1
[root@hadoop apps]# chown -R hadoop:hadoop elasticsearch-6.5.3/
[root@hadoop apps]# ls
elasticsearch-6.5.3  hadoop-2.8.1  jdk1.8.0_45  kafka_2.11-1.1.1
[root@hadoop apps]# ll
total 16
drwxr-xr-x  8 hadoop hadoop 4096 Dec  7  2018 elasticsearch-6.5.3
drwxrwxr-x 11    500    500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x  8     10    143 4096 Apr 11  2015 jdk1.8.0_45
drwxr-xr-x  9 root   root   4096 Apr 24 14:32 kafka_2.11-1.1.1

##5.2 给与普通用户root借用权限
[root@hadoop apps]# vi /etc/sudoers
## Allow root to run any commands anywhere
root    ALL=(ALL)       ALL
hadoop    ALL=(ALL)       ALL

切换普通用户

su hadoop

elasticsearch

我遇到的问题啊

、max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]_林柚晞的博客-CSDN博客

第二次安装还遇到了一个问题

max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

修改方法

切换root用户

sysctl -w vm.max_map_count=262144

查看

sysctl -a|grep vm.max_map_count

如果显示

vm.max_map_count = 262144

就是成功了

就把账号切换成root,然后reboot一下系统

系统启动之后

su hadoop

elasticsearch -d

查看是否启动成功

jps

curl -XGET qianfeng01:9200

 

就是es报错如何修改

就是切换到普通用户,直接敲elasticsearch

如果是没有报错卡住了就是前台进程,就顺利了

如果要访问web端,就必须把虚拟机中的防火墙关闭

我们再搞一搞插件工具

安装包:

链接:https://pan.baidu.com/s/1-3eWVFmar03xc2RPz-rwRQ?pwd=dklo
提取码:dklo
--来自百度网盘超级会员V2的分享

这个安装包随便解压到一个文件夹下面,只要自己能找到就可以。

打开谷歌浏览器(好久没用过谷歌浏览器了,主要是不能下载插件了,所以浏览器过于简陋,连个常用表单都没有)

 

关掉上面的弹窗

打开谷歌浏览器的时候,看见搜索框旁边的小插件图标就可以看见es的快捷访问啦

就是我这边还有个bug没搞好,集群没连上

出现上面那个集群健康值未连接,是因为服务器里面的防火墙开了。

systemctl stop firewalld.service

查看状态

systemctl status firewalld.service

 看一下web端

1.6 redis的安装

安装包:

链接:https://pan.baidu.com/s/1VoBQEeBBovSWQMTAm7T8mw?pwd=teeq
提取码:teeq
--来自百度网盘超级会员V2的分享

[root@hadoop software]# tar -zxvf redis-4.0.11.tar.gz -C /opt/apps/

[root@hadoop redis-4.0.11]# yum -y install gcc-c++

[root@hadoop redis-4.0.11]# make
[root@hadoop redis-4.0.11]# make PREFIX=/opt/apps/redis-4.0.11 install
[root@hadoop redis-4.0.11]# mkdir etc
[root@hadoop redis-4.0.11]# cp redis.conf etc/
[root@hadoop redis-4.0.11]# cd bin/
[root@hadoop bin]# cp redis-benchmark redis-server redis-cli /usr/bin/

 搞配置

 vi /opt/apps/redis-4.0.11/etc/redis.conf

注意都是解除注释,而不是全删啊。

daemonize yes
loglevel verbose

requirepass root

bind 192.168.10.101

就是如果  requirepass 修改会连不到redis服务器,那就注释掉密码试一试。密码我都不想配置了,实在麻烦

[root@hadoop ~]# redis-server /opt/apps/redis-4.0.11/etc/redis.conf
[root@hadoop ~]# ps -ef | grep redis
root     28066     1  0 16:07 ?        00:00:00 redis-server 192.168.10.101:6379
[root@hadoop ~]# redis-cli -h 192.168.10.101
192.168.10.101:6379> SHUTDOWN
(error) NOAUTH Authentication required.
192.168.10.101:6379> AUTH root
OK
192.168.10.101:6379> SHUTDOWN
not connected> exit
[root@hadoop ~]# ps -ef | grep redis
root     28835  1936  0 16:09 pts/0    00:00:00 grep --color=auto redis

redis的开机脚本

[root@hadoop redis-4.0.11]# vi redis

 #!/bin/bash
#chkconfig: 2345 80 90
PATH=/usr/bin:/usr/local/bin:/sbin:/bin
REDIS_PORT=6379
EXEC=/opt/apps/redis-4.0.11/bin/redis-server
REDIS_CLI=/opt/apps/redis-4.0.11/bin/redis-cli

PIDFILE=/var/run/redis.pid
CONF=/opt/apps/redis-4.0.11/etc/redis.conf

case "$1" in
    start)
        if [ -f ${PIDFILE} ];then
            echo "${PIDFILE} exists, process is already running or crashed"
        else
            echo "Starting Redis Server ...."
            ${EXEC} ${CONF}
        fi
        if [ "$?" = "0" ];then
            echo "Redis is running"
        fi
        ;;
    stop)
        if [ ! -e ${PIDFILE} ];then
            echo "${PIDFILE} does not exists, process is not running"
        else
            PID=$(cat ${PIDFILE})
            echo "Stopping..."
            ${REDIS_CLI} -p ${REDIS_PORT} AUTH root
            ${REDIS_CLI} -p ${REDIS_PORT} SHUTDOWN
            while [ -x ${PIDFILE} ]
            do
                echo "Waiting for Redis shutdown..."
                sleep 1
            done
            echo "Redis stoped"
        fi
        ;;
    restart|force-reload)
            ${0} stop
            ${0} start
        ;;
        *)
            echo "Usage: /etc/init.d/redis {start|stop|restart|force-reload}" >&2
            exit 1
esac

 [root@hadoop redis-4.0.11]# cp redis /etc/init.d/

[root@hadoop redis-4.0.11]# chmod +x /etc/init.d/redis

##3. 查看自启动的所有程序

[root@hadoop redis-4.0.11]# chkconfig --list

##4. 将redis添加到自启动列表
[root@hadoop redis-4.0.11]# chkconfig --add redis
[root@hadoop redis-4.0.11]# chkconfig --level 2345 redis on

##5. 启动关闭测试
[root@hadoop redis-4.0.11]# systemctl start redis
[root@hadoop redis-4.0.11]# service redis start

 1.7 flink的安装

安装包:

链接:https://pan.baidu.com/s/1d6tMAwfLuC47TgAsClnY3w?pwd=k1h9
提取码:k1h9
--来自百度网盘超级会员V2的分享

安装standalone

##1. 解压
[root@hadoop software]# tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /opt/apps/

##2. 配置环境变量
[root@hadoop flink-1.9.1]# vi /etc/profile


export FLINK_HOME=/opt/apps/flink-1.9.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin:$FLINK_HOME/bin

[root@hadoop flink-1.9.1]# source /etc/profile

配置

[root@hadoop flink-1.9.1]# vi conf/flink-conf.yaml

jobmanager.rpc.address: qianfeng01

jobmanager.rpc.port: 6123

rest.port: 8081

rest.address: qianfeng01

 ##4. 配置从机文件
[root@hadoop flink-1.9.1]# vi conf/slaves

 qianfeng01

##5. 配置主机文件
[root@hadoop flink-1.9.1]# vi conf/masters 

qianfeng01:8081

 ##6. 先启动hdfs
[root@hadoop flink-1.9.1]# start-dfs.sh
[root@hadoop flink-1.9.1]# start-cluster.sh

web端访问:

http://192.168.10.101:8081/

测试,可以不测试

##8. 测试Flink
##8.1 A窗口
[root@hadoop flink-1.9.1]# yum -y install nc
[root@hadoop flink-1.9.1]# nc -l 8000

111
222
33
444
555
33
111
222

##8.2 B窗口
[root@hadoop flink-1.9.1]# flink run examples/streaming/SocketWindowWordCount.jar --port 8000

[root@hadoop flink-1.9.1]# tail -f log/flink-root-taskexecutor-0-hadoop.out
111 : 1
555 : 1
444 : 1
33 : 1
222 : 1
33 : 1
222 : 1
111 : 1

##9. 提交jar
[root@hadoop flink-1.9.1]# flink run examples/batch/WordCount.jar --input /etc/profile --output /home/output/00
Starting execution of program
Program execution finished
Job with JobID ca414928ddcfd57969326827f5cb7205 has finished.
Job Runtime: 21670 ms


[root@hadoop flink-1.9.1]# tail -f /home/output/00
want 1
we 1
what 1
wide 1
will 1
workaround 1
x 1
you 3
your 1
z 1

1.8 安装mysql

安装包:

链接:https://pan.baidu.com/s/1qi2i9r7loMj3Y04LsMTc6w?pwd=4u1e
提取码:4u1e
--来自百度网盘超级会员V2的分享

##1. 安装引导
[root@hadoop software]# yum -y localinstall mysql-community-release-el6-5.noarch.rpm
##2. 安装mysql
[root@hadoop software]# yum -y install mysql-server
##3. 启动mysql的服务
[root@hadoop software]# service mysqld start
##4. 设置初始root账户密码
[root@hadoop software]# mysqladmin -uroot password '123456'
##5. 登陆mysql服务
[root@hadoop software]# mysql -uroot -p123456
##6. 远程授权
mysql> grant all privileges on *.* to root@"%" identified by "123456" with grant option;
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
##7. 就是可以使用mysql的client去远程链接mysql的服务了

2 数据介绍

2.1 平台功能
功能模块说明
核心业务基于核心业务数据(包括旅游订单、酒店住宿、车票业务)的相关实时计算、实时展示
用户行为日志基于产品设计的各种埋点
风控报警用户异常行为进行监控和报警

本项目以大数据实时平台为目标

技术方向说明
实时的ETL实时的清洗、转换为规范的格式化的数据
实时的数据统计实现各种实时的统计指标
实时的数据存储实时数据落地持久化、交互搜索、动态计算提供技术支持
规则处理主要是服务实时风控、报警相关需求
交互式查询交互式查询明细数据或实时的聚合数据(Clickhouse、Apache Druid)
实时数据展示主要是服务于数据使用方,提供更直观的数据展示形式

2.2 数据的流程描述

实时的旅游平台,说明数据一定会经过消息中间件(消息队列/消息通道)。原始的数据源通常都是通过消息通道来完成数据的采集。随后根据实际情况将消息通道中的数据存储在各种不同的分布式存储介质中。最终统计存储在分布式介质中的数据,也可以做成BI类型可视化展示。

2.3 数据组成

2.3.1 事实数据

2.3.1.1 旅游订单数据

{
    "userID":"1986",
    "user_mobile":"15549477595",
    "product_id":"12312312",
    "product_traffic":"01",
    "product_traffic_grade":"11",
    "product_traffic_type" : "1",
    "product_pub" : "sdfsfsdfsdff | sdfsdfsdfds",
    "user_region" : "12312",
    "travel_memeber_adult" : "2",
    ...
}

 userID = 用户唯一编号
user_mobile = 用户手机号码
product_id = 旅游这个产品编号
product_traffic = 旅游的交通选择
product_traffic_grade = 坐席
product_traffic_type = 行程种类
product_pub = 旅游住宿选择
user_region = 所在区域
travel_memeber_adult = 本次旅游的成年人的人数
travel_memeber_younger = 本次旅游的儿童的人数
travel_member_baby = 本次旅游的婴儿的人数
product_price = 产品原价格
has_activity = 0表示五活动价格 | 0.8表示打八折
product_fee = 产品价格
order_id = 旅游订单
order_ct = 下单时间

 2.3.1.2 用户行为数据

行为类型:action: 'launch(启动) | interactive(交互)| page_enter(页面曝光:其实就是产品页展示)'
事件类型:eventType:  'view(浏览) | slide(滑动)| click:评论、点赞、分享(点击)  '
用户ID:userID(sdfsfasdf1231)
app端的手机设备号:userDevice
操作系统:os
手机制造商:manufacturer
运营商:carrier
网络类型:networkType
所在区域:userRegion
所在区域IP : userRegionIP
经度:longitude
纬度:latitude
扩展信息:exts
事件发生时间:ct

 {
    "os":"1",
    "longitude":"115.12321",
    "latitude":"26.88282",
    "userRegion":"12312",
    "userID":"12312",
    "manufacturer":"09",
    "userRegionIP":"10.206.0.4",
    "action":"08",
    "eventType":"01"
    ...
}

2.3.1.3维度数据
  • 旅游产品维度表

CREATE TABLE travel.dim_product(
    product_id text NULL COMMENT '旅游产品的编号',
    product_level int(11) DEFAULT 0 COMMENT '旅游产品的级别',
    product_type text  NULL COMMENT '旅游产品的类型',
    departure_code text  NULL COMMENT '旅游产品的出发地编码',
    des_city_code text  NULL COMMENT '旅游产品的目的地编码',
    tourim_ticket_type text  NULL COMMENT '旅游产品的订单类型'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

  • 酒店维度表

CREATE TABLE travel.dim_pub(
    pub_id text NULL COMMENT '酒店的编号',
    pub_name text NULL COMMENT '酒店的名称',
    pub_stat text  NULL COMMENT '酒店的星级',
    pub_grade text  NULL COMMENT '酒店的等级编码',
    pub_grade_desc text  NULL COMMENT '酒店的等级编码描述',
    pub_area_code text  NULL COMMENT '酒店所在区域编码',
    pub_address text  NULL COMMENT '酒店地址',
    is_national text  NULL COMMENT '是否属于境内',
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

  • 导入维度数据到mysql

sql文件(因为文件比较大,然后需要自己下载到本地,使用navicat远程连接服务器上的mysql)

链接:https://pan.baidu.com/s/1dfCxtrmgdCNnFeCfM_cDdQ?pwd=btl3
提取码:btl3
--来自百度网盘超级会员V2的分享

打开一下服务器

打开windows上面的navicat

 

[root@qianfeng01 ~]# mysql -uroot -p123456

 

下面就开始api开发吧

3 造数

打开idea,创建一个maven项目

3.1 资源准备

3.1.0pom.xml


    4.0.0

    com.qf.bigdata
    qfdata
    1.0

    
        UTF-8
        UTF-8
        1.8

        1.1.8
        1.7.22
        1.2.16

        1.2.29
        2.4.0
        2.8.5
        2.9.3
        21.0
        5.1.44
        3.5.1
        3.4
        2.4
        1.6

        1.8.1
        1.8.1

        
        2.8.1
        0.11
        1.1.1
        3.0.3

        compile
        
    


    

        
        
            org.slf4j
            slf4j-api
            ${slf4j.version}
            ${scope}
        

        
            log4j
            log4j
            ${log4j.version}
            ${scope}
        

        
            org.slf4j
            jcl-over-slf4j
            ${slf4j.version}
            ${scope}
        

        
            ch.qos.logback
            logback-classic
            ${logback.version}
            ${scope}
        

        
            ch.qos.logback
            logback-core
            ${logback.version}
            ${scope}
        

        
            ch.qos.logback
            logback-access
            ${logback.version}
            ${scope}
        


        
        
            com.google.guava
            guava
            ${guava.version}
            ${scope}
        

        
        
            com.alibaba
            fastjson
            ${fastjson.version}
            ${scope}
        


        
            com.google.code.gson
            gson
            ${gson.version}
        


        
            com.jayway.jsonpath
            json-path
            ${json-path.version}
            ${scope}
        

        
        
            mysql
            mysql-connector-java
            ${mysql.jdbc.version}
            ${scope}
        


        
        
            com.google.protobuf
            protobuf-java
            ${protobuf.version}
            ${scope}
        


        
        
            org.apache.commons
            commons-csv
            ${commons-csv.version}
            ${scope}
        

        
            org.apache.commons
            commons-lang3
            ${commons-lang3.version}
            ${scope}
        

        
            commons-io
            commons-io
            ${commons-io.version}
            ${scope}
        

        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            ${scope}
            
                
                    protobuf-java
                    com.google.protobuf
                
            
        

        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${scope}
            
                
                    jsp-api
                    javax.servlet.jsp:
                
                
                    org.slf4j
                    log4j-over-slf4j
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        

        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            ${scope}
        

        
        
            com.101tec
            zkclient
            ${zk.client.version}
            ${scope}
        

        
        
            org.apache.kafka
            kafka_2.11
            ${kafka.version}
            ${scope}
        

        
            org.apache.kafka
            kafka-clients
            ${kafka.version}
            ${scope}
        

        
        
            com.esotericsoftware
            kryo
            ${kryo.version}
            ${scope}
        

        

        


        
        
            org.apache.avro
            avro
            ${avro.version}
        

        
        
            org.apache.parquet
            parquet-avro
            ${parquet.version}
            ${scope}
        

        
            org.apache.parquet
            parquet-common
            ${parquet.version}
            ${scope}
        

        
            org.apache.parquet
            parquet-tools
            ${parquet.version}
            ${scope}
        

        
            org.apache.parquet
            parquet-hadoop-bundle
            ${parquet.version}
            ${scope}
        




    


    
        
            dev
            
                true
            
            
                
                    
                        ${project.basedir}/src/main/resources
                        true
                    
                
                
                    
                        org.apache.maven.plugins
                        maven-shade-plugin
                    
                
            
        
    


    
        target/classes
        target/test-classes

        
            
                
                    org.apache.maven.plugins
                    maven-shade-plugin
                    3.1.0
                    
                        
                        
                            TravelCurLogJob
                            package
                            
                                shade
                            
                            
                                true
                                TravelCurLogJob
                                
                                    
                                        *:*
                                        
                                            META-INF
    public static QParameterTool fromArgs(String[] args) {
        final Map map = new HashMap<>(args.length / 2);

        int i = 0;
        while (i < args.length) {
            final String key;

            if (args[i].startsWith("--")) {
                key = args[i].substring(2);
            } else if (args[i].startsWith("-")) {
                key = args[i].substring(1);
            } else {
                throw new IllegalArgumentException(
                        String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
                                Arrays.toString(args), args[i]));
            }

            if (key.isEmpty()) {
                throw new IllegalArgumentException(
                        "The input " + Arrays.toString(args) + " contains an empty argument");
            }

            i += 1; // try to find the value

            if (i >= args.length) {
                map.put(key, NO_VALUE_KEY);
            } else if (NumberUtils.isNumber(args[i])) {
                map.put(key, args[i]);
                i += 1;
            } else if (args[i].startsWith("--") || args[i].startsWith("-")) {
                // the argument cannot be a negative number because we checked earlier
                // -> the next argument is a parameter name
                map.put(key, NO_VALUE_KEY);
            } else {
                map.put(key, args[i]);
                i += 1;
            }
        }

        return fromMap(map);
    }

    
    public static QParameterTool fromPropertiesFile(String path) throws IOException {
        File propertiesFile = new File(path);
        return fromPropertiesFile(propertiesFile);
    }

    
    public static QParameterTool fromPropertiesFile(File file) throws IOException {
        if (!file.exists()) {
            throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist");
        }
        try (FileInputStream fis = new FileInputStream(file)) {
            return fromPropertiesFile(fis);
        }
    }

    
    public static QParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
        Properties props = new Properties();
        props.load(inputStream);
        return fromMap((Map) props);
    }

    
    public static QParameterTool fromMap(Map map) {
        return new QParameterTool(map);
    }

    
    public static QParameterTool fromSystemProperties() {
        return fromMap((Map) System.getProperties());
    }

    // ------------------ QParameterTool  ------------------------
    public  Map data = null;

    // data which is only used on the client and does not need to be transmitted
    protected transient Map defaultData;
    protected transient Set unrequestedParameters;

    private QParameterTool(Map data) {
        this.data = Collections.unmodifiableMap(new HashMap<>(data));

        this.defaultData = new ConcurrentHashMap<>(data.size());

        this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));

        unrequestedParameters.addAll(data.keySet());
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        QParameterTool that = (QParameterTool) o;
        return Objects.equals(data, that.data) &&
                Objects.equals(defaultData, that.defaultData) &&
                Objects.equals(unrequestedParameters, that.unrequestedParameters);
    }

    @Override
    public int hashCode() {
        return Objects.hash(data, defaultData, unrequestedParameters);
    }

    
    public Set getUnrequestedParameters() {
        return Collections.unmodifiableSet(unrequestedParameters);
    }

    // ------------------ Get data from the util ----------------

    
    public int getNumberOfParameters() {
        return data.size();
    }

    
    public String get(String key) {
        addToDefaults(key, null);
        unrequestedParameters.remove(key);
        return data.get(key);
    }

    
    public String getRequired(String key) {
        addToDefaults(key, null);
        String value = get(key);
        if (value == null) {
            throw new RuntimeException("No data for required key '" + key + "'");
        }
        return value;
    }

    
    public String get(String key, String defaultValue) {
        addToDefaults(key, defaultValue);
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return value;
        }
    }

    
    public boolean has(String value) {
        addToDefaults(value, null);
        unrequestedParameters.remove(value);
        return data.containsKey(value);
    }

    // -------------- Integer

    
    public int getInt(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Integer.parseInt(value);
    }

    
    public int getInt(String key, int defaultValue) {
        addToDefaults(key, Integer.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        }
        return Integer.parseInt(value);
    }

    // -------------- LONG

    
    public long getLong(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Long.parseLong(value);
    }

    
    public long getLong(String key, long defaultValue) {
        addToDefaults(key, Long.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        }
        return Long.parseLong(value);
    }

    // -------------- FLOAT

    
    public float getFloat(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Float.valueOf(value);
    }

    
    public float getFloat(String key, float defaultValue) {
        addToDefaults(key, Float.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Float.valueOf(value);
        }
    }

    // -------------- DOUBLE

    
    public double getDouble(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Double.valueOf(value);
    }

    
    public double getDouble(String key, double defaultValue) {
        addToDefaults(key, Double.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Double.valueOf(value);
        }
    }

    // -------------- BOOLEAN

    
    public boolean getBoolean(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Boolean.valueOf(value);
    }

    
    public boolean getBoolean(String key, boolean defaultValue) {
        addToDefaults(key, Boolean.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Boolean.valueOf(value);
        }
    }

    // -------------- SHORT

    
    public short getShort(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Short.valueOf(value);
    }

    
    public short getShort(String key, short defaultValue) {
        addToDefaults(key, Short.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Short.valueOf(value);
        }
    }

    // -------------- BYTE

    
    public byte getByte(String key) {
        addToDefaults(key, null);
        String value = getRequired(key);
        return Byte.valueOf(value);
    }

    
    public byte getByte(String key, byte defaultValue) {
        addToDefaults(key, Byte.toString(defaultValue));
        String value = get(key);
        if (value == null) {
            return defaultValue;
        } else {
            return Byte.valueOf(value);
        }
    }

    // --------------- Internals

    protected void addToDefaults(String key, String value) {
        final String currentValue = defaultData.get(key);
        if (currentValue == null) {
            if (value == null) {
                value = DEFAULT_UNDEFINED;
            }
            defaultData.put(key, value);
        } else {
            // there is already an entry for this key. Check if the value is the undefined
            if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) {
                // update key with better default value
                defaultData.put(key, value);
            }
        }
    }

    // ------------------------- Export to different targets -------------------------


    
    public Properties getProperties() {
        Properties props = new Properties();
        props.putAll(this.data);
        return props;
    }

    
    public void createPropertiesFile(String pathToFile) throws IOException {
        createPropertiesFile(pathToFile, true);
    }

    
    public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException {
        final File file = new File(pathToFile);
        if (file.exists()) {
            if (overwrite) {
                file.delete();
            } else {
                throw new RuntimeException("File " + pathToFile + " exists and overwriting is not allowed");
            }
        }
        final Properties defaultProps = new Properties();
        defaultProps.putAll(this.defaultData);
        try (final OutputStream out = new FileOutputStream(file)) {
            defaultProps.store(out, "Default file created by Flink's ParameterUtil.createPropertiesFile()");
        }
    }

    protected Object clone() throws CloneNotSupportedException {
        return new QParameterTool(this.data);
    }

    // ------------------------- Interaction with other ParameterUtils -------------------------

    
    public QParameterTool mergeWith(QParameterTool other) {
        final Map resultData = new HashMap<>(data.size() + other.data.size());
        resultData.putAll(data);
        resultData.putAll(other.data);

        final QParameterTool ret = new QParameterTool(resultData);

        final HashSet requestedParametersLeft = new HashSet<>(data.keySet());
        requestedParametersLeft.removeAll(unrequestedParameters);

        final HashSet requestedParametersRight = new HashSet<>(other.data.keySet());
        requestedParametersRight.removeAll(other.unrequestedParameters);

        ret.unrequestedParameters.removeAll(requestedParametersLeft);
        ret.unrequestedParameters.removeAll(requestedParametersRight);

        return ret;
    }

    // ------------------------- ExecutionConfig.UserConfig interface -------------------------

    public Map toMap() {
        return data;
    }

    // ------------------------- Serialization ---------------------------------------------

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();

        defaultData = new ConcurrentHashMap<>(data.size());
        unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
    }


}

package com.qf.bigdata.realtime.util;

import com.qf.bigdata.realtime.constant.CommonConstant;
import com.qf.bigdata.realtime.dvo.RegionDo;
import com.qf.bigdata.realtime.util.CSVUtil;
import com.qf.bigdata.realtime.util.QParameterTool;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.LdapGroupsMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Int;

import java.io.Serializable;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TravelOrderHelper implements Serializable {
 //日志管理工具
    private static final Logger log = LoggerFactory.getLogger(TravelOrderHelper.class);
    
    public static final String KEY_KAFKA_ID = "KAFKA_ID";
    public static final String KEY_ORDER_ID = "order_id";
    public static final String KEY_USER_ID = "userid";
    public static final String KEY_USER_MOBILE = "user_mobile";
    public static final String KEY_PRODUCT_ID = "product_id";
    public static final String KEY_PRODUCT_TRAFFIC = "product_traffic";
    public static final String KEY_PRODUCT_TRAFFIC_GRADE = "product_traffic_grade";
    public static final String KEY_PRODUCT_TRAFFIC_TYPE = "product_traffic_type";
    public static final String KEY_PRODUCT_PUB = "product_pub";
    public static final String KEY_USER_REGION = "user_region";
    public static final String KEY_TRAVEL_MEMBER_ADULT = "travel_member_adult";
    public static final String KEY_TRAVEL_MEMBER_YOUNGER = "travel_member_younger";
    public static final String KEY_TRAVEL_MEMBER_BABY = "travel_member_baby";
    public static final String KEY_PRODUCT_PRICE = "product_price";
    public static final String KEY_HAS_ACTIVITY = "has_activity";
    public static final String KEY_PRODUCT_FEE = "product_fee";
    public static final String KEY_ORDER_CT = "order_ct";

    // 地区
    
    public static final String REGION_KEY_CODE = "region_code";
    public static final String REGION_KEY_CODE_DESC = "region_code_desc";
    public static final String REGION_KEY_CITY = "region_city";
    public static final String REGION_KEY_CITY_DESC = "region_city_desc";
    public static final String REGION_KEY_PROVINCE = "region_province";
    public static final String REGION_KEY_PROVINCE_DESC = "region_province_desc";

    // 地区列表,里面存放了所有的地区
    public static final List regions = new ArrayList<>();

    // 酒店
    
    public static final String PUB_KEY_ID = "pub_id";
    public static final String PUB_KEY_NAME = "pub_name";
    public static final String PUB_KEY_STAT = "pub_stat";
    public static final String PUB_KEY_GRADE = "pub_grade";
    public static final String PUB_KEY_GRADE_DESC = "pub_grade_desc";
    public static final String PUB_KEY_AREA_CODE = "pub_area_code";
    public static final String PUB_KEY_ADDRESS = "pub_address";
    public static final String PUB_KEY_IS_NATIONAL = "is_national";

    // 酒店与旅游产品的映射集合
    public static final Map proMappingPub = new HashMap<>();

    //产品
    public static final String PRODUCT_KEY_ID = "product_id";
    public static final String SHOP_KEY_ID = "shop_id";

    // 产品的信息列表
    public static final List products = new ArrayList<>();

    //参数
    public static final String KEY_TOPIC = "topic";
    public static final String KEY_SOURCE = "source";
    public static final String KEY_BEGIN = "begin";
    public static final String KEY_END = "end";
    public static final String KEY_COUNT = "count";
    public static final String KEY_SLEEP = "sleep";

    public static final String SOURCE_PRODUCT = "product";
    public static final String SOURCE_PAY = "pay";

    public static final String TIME_ORDER = "time_order";

    public static final Integer TIME_RANGE_MIN = 1;
    public static final Integer TIME_RANGE_MAX = 120;

    public static final Integer COUNT_MIN = 1;
    public static final Integer COUNT_MAX = 10000;

    public static final Integer SLEEP_MIN = 1;
    public static final Integer SLEEP_MAX = 3600 * 1000;


    
     public static void main(String[] args) {
     //校验这个参数是否合法
         String checkResult = checkParams(args);
         //校验结果
         if (StringUtils.isEmpty(checkResult)){
             //2.1执行if说明参数无问题,效果是连续不断地创作数据,然后将数据生产到指定的kafka主题
             chooseFun(args);
         }else {
             System.err.println("some erros happends ["+checkResult+"]");
         }
     }
    //校验参数是否合法
    //如果这个方法返回的是空字符串,说明参数没有问题
    private static String checkParams(String[] args) {
         //定义一个返回结果的字符串
         String result = "";
         //通过加载args参数列表获取参数工具类对象
        QParameterTool tools = QParameterTool.fromArgs(args);
        //校验主题
        String  topic = tools.get(KEY_TOPIC);
        if(StringUtils.isEmpty(topic)){
          result = "agrs is Empty";
          return result;
        }
        //其他参数
        String source = tools.get(KEY_SOURCE);
        if(!SOURCE_PRODUCT.equalsIgnoreCase(source) && !SOURCE_PAY.equalsIgnoreCase(source)){
            result = "source is error .source must be ['product' or 'pay']";
            return  result;
        }
         Integer count = tools.getInt(KEY_COUNT);
        if (null == count){
            result = "count is empty";
            return result;
        }else{
            if (count > COUNT_MAX || count < COUNT_MIN){
                result = "count is unbound[" + COUNT_MIN + "~"+ COUNT_MAX +"]";
                return result;
            }
        }
        Integer sleep = tools.getInt(KEY_SLEEP);
        if( null == sleep){
            result = " sleep is empty";
            return  result;
        } else{
            if (sleep > SLEEP_MAX || sleep < SLEEP_MIN){
                result = "sleep is unbound[" + SLEEP_MIN + "~"+ SLEEP_MAX +"]";
                return result;
            }
        }
         return result;
     }
     //选择造数

    public static void chooseFun(String[] args){
         //通过加载args参数列表获取参数工具类对象
         QParameterTool tools = QParameterTool.fromArgs(args);
         String topic = tools.get(KEY_TOPIC);
         String source = tools.get(KEY_SOURCE);
         Integer count = tools.getInt(KEY_COUNT);
         Integer sleep = tools.getInt(KEY_SLEEP);
         if (SOURCE_PRODUCT.equalsIgnoreCase(source)){
             //创建关于产品的数据
             makeTravelProductData(topic,count,sleep);
         }else if(SOURCE_PAY.equalsIgnoreCase(source)){

         }
    }
    //创造产品订单数据
    public static void makeTravelProductData(String topic, Integer count, Integer sleep) {
      //发送序列化对象
        String dateFormatter = CommonConstant.FORMATTER_YYYYMMDDHHMMDD;
        String dayFormatter = CommonConstant.FORMATTER_YYYYMMDD;
        ChronoUnit chronoUnit = ChronoUnit.MINUTES;
        ChronoUnit daysChronoUnit = ChronoUnit.DAYS;
        //辅助数据
        //2.1地区信息
        Listregions = getRegions();
        //2.2 酒店信息
        Map pubs = getPubMappingPro();
        //2.3产品信息
        List productIDs = getProducts();

     }
    //旅游产品信息
    private static List getProducts() {
         if (CollectionUtils.isEmpty(products)){
             try{
                 List> productDatas = CSVUtil.readCSVFile(CSVUtil.PRODUCT_FILE,CSVUtil.QUOTE_COMMON);
                 if(CollectionUtils.isNotEmpty(productDatas)){
                     for (Map product:productDatas){
                         String productID = product.getOrDefault(PRODUCT_KEY_ID,"");
                         products.add(productID);
                     }
                 }
             }catch (Exception e){
                 log.error("TravelOrderHelper 's products errors :" + e);
             }
         }
         return  products;
    }
  //旅游产品与酒店映射关系
    private static Map getPubMappingPro() {
         //表示没有初始化
        if(MapUtils.isEmpty(proMappingPub)){
            try{
                List> pubDatas = CSVUtil.readCSVFile(CSVUtil.PUB_FILE,CSVUtil.QUOTE_COMMON);
                //校验
                if(CollectionUtils.isNotEmpty(pubDatas)){
                    //遍历
                    for (Map pub : pubDatas){
                        String pubId = pub.getOrDefault(PUB_KEY_ID,"");
                        String[] pps = pubId.split("\|");
                        proMappingPub.put(pps[0],pubId);
                    }
                }
            }catch (Exception e){
                log.error("TravelOrderelper 's pub errors:" + e);
            }
        }
        return proMappingPub;
    }


    //获取到地区列表
    private static List getRegions() {
         //如果集合为空,就是第一次获取地区,需要初始化地区信息
         if (CollectionUtils.isEmpty(regions)){
             try{
             //就是一个路径,一个是分隔符
                List > regionsDatas = CSVUtil.readCSVFile(CSVUtil.REGION_FILE,CSVUtil.QUOTE_COMMON);
                //将regionDatas中Map封装到regionDo对象
             if(CollectionUtils.isNotEmpty(regionsDatas)){
                 for (Map region : regionsDatas){
                     RegionDo regionDo = new RegionDo();
                     regionDo.setRegionCity(region.getOrDefault(REGION_KEY_CITY, ""));
                     regionDo.setRegionCityDesc(region.getOrDefault(REGION_KEY_CITY_DESC, ""));
                     regionDo.setRegionCode(region.getOrDefault(REGION_KEY_CODE, ""));
                     regionDo.setRegionCodeDesc(region.getOrDefault(REGION_KEY_CODE_DESC, ""));
                     regionDo.setRegionProvince(region.getOrDefault(REGION_KEY_PROVINCE, ""));
                     regionDo.setRegionProvinceDesc(region.getOrDefault(REGION_KEY_PROVINCE_DESC, ""));
                     regions.add(regionDo);

                 }
             }

         }catch(Exception e){
             log.error("TravelOrderHelper 's region errors : " +e);
        }
         }
         return regions;
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/837501.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号