目录
0 环境准备:
1 软件环境的搭建
1.1 jdk的安装
1.2 scala的安装
1.3 hadoop的安装
1.4 KAFKA的安装
1.5 elastocsearch的安装
1.6 redis的安装
1.7 flink的安装
1.8 安装mysql
2 数据介绍
2.1 平台功能
2.2 数据的流程描述
2.3 数据组成
2.3.1 事实数据
2.3.1.1 旅游订单数据
2.3.1.2 用户行为数据
2.3.1.3维度数据
3 造数
3.1 资源准备
3.1.0pom.xml
3.1.1 其他资源
3.2 开发
0 环境准备:
新建虚拟机->设置静态网络->修改映射配置
新建虚拟机->设置静态网络->修改映射配置
新建虚拟机 参考
面向大数据开发的集群之虚拟机搭建(一)_林柚晞的博客-CSDN博客
虚拟机网络+设置静态网+修改映射配置+免密登录 参考
项目0单节点的虚拟机做大数据开发(四万字全)_林柚晞的博客-CSDN博客
创建项目路径
mkdir -p /opt/apps
mkdir -p /opt/software
mkdir -p /opt/scritps
apps 是安装软件路径的路径
software是把安装包上传到虚拟机的文件
scripts是存放一些可以自动化安装软件的脚本
1 软件环境的搭建
先说明一下:就是把安装包扔到指定的位置是使用远程工程工具的。这边省略了把安装包上传到服务器。
自动化安装脚本的总结
-设置安装路径和安装包的位置,配置环境变量的位置
-判断安装路径的前驱路径是否存在
-cat追加文件路径到文件里面(比如修改环境变量,覆盖配置文件)
-正式安装步骤,解压缩等
1.1 jdk的安装
安装包:
链接:https://pan.baidu.com/s/1R5LaDIasYh4vpY_RUB_Qhw?pwd=tff2
提取码:tff2
--来自百度网盘超级会员V2的分享
cd /opt/scripts/
vi install_jdk.sh
#!/bin/bash
# author : XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装jdk
# 约定 > 配置 > 编码INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
JDK_TAR=/opt/software/jdk-8u45-linux-x64.tar.gz
ETC_PROFILE=/etc/profile# 提示使用方法的函数
usage() {
echo "请将jdk-8u45-linux-x64.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${JDK_TAR} ]; then
usage
exit 0
fi# 已经安装过了
if [ -e ${JAVA_DIR} ]; then
echo "${JAVA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
exit 0
fi# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
mkdir -p ${INSTALL_PREFIX}
echo "初始化目录:${INSTALL_PREFIX}"
fiif [ ! -e ${JAVA_DIR} ]; then
mkdir -p ${JAVA_DIR}
echo "初始化目录:${JAVA_DIR}"
fi## 解压JDK的tar包
tar -zxvf ${JDK_TAR} -C ${INSTALL_PREFIX}## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export JAVA_HOME=${JAVA_DIR}
export CLASS_PATH=.:${JAVA_DIR}/lib/dt.jar:${JAVA_DIR}/lib/tool.jar
export PATH=$PATH:${JAVA_DIR}/bin
EOF## 提示成功
echo "install jdk successful!!!"
chmod +x ./install_jdk.sh
./install_jdk.sh
source /etc/profile
java -version
1.2 scala的安装
安装包:
链接:https://pan.baidu.com/s/1oBbplZTIem1K0g4Wv_PD5Q?pwd=dzcb
提取码:dzcb
--来自百度网盘超级会员V2的分享
vi /opt/scripts/install_scala.sh
#!/bin/bash
# author : XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装scala
# 约定 > 配置 > 编码INSTALL_PREFIX=/opt/apps
SCALA_DIR=${INSTALL_PREFIX}/scala-2.11.8
SCALA_TAR=/opt/software/scala-2.11.8.tgz
ETC_PROFILE=/etc/profile# 提示使用方法的函数
usage() {
echo "请将scala-2.11.8.tgz上传到/opt/software处然后再执行此脚本!!!"
}# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${SCALA_TAR} ]; then
usage
exit 0
fi# 已经安装过了
if [ -e ${SCALA_DIR} ]; then
echo "${SCALA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
exit 0
fi# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
mkdir -p ${INSTALL_PREFIX}
echo "初始化目录:${INSTALL_PREFIX}"
fiif [ ! -e ${SCALA_DIR} ]; then
mkdir -p ${SCALA_DIR}
echo "初始化目录:${SCALA_DIR}"
fi## 解压JDK的tar包
tar -zxvf ${SCALA_TAR} -C ${INSTALL_PREFIX}## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export SCALA_HOME=${SCALA_DIR}
export PATH=$PATH:${SCALA_DIR}/bin
EOF## 提示成功
echo "install SCALA successful!!!"
chmod +x ./install_scala.sh
./install_scala.sh
source /etc/profile
scala -version
1.3 hadoop的安装
安装包:
链接:https://pan.baidu.com/s/1O-E4B1MUZjA90rsvEi1nwQ?pwd=q0ey
提取码:q0ey
--来自百度网盘超级会员V2的分享
vi /opt/scripts/install_hadoop.sh
##1. install_hadoop.sh
#!/bin/bash# author : lixi
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装hadoop
# 约定 > 配置 > 编码INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
HADOOP_DIR=${INSTALL_PREFIX}/hadoop-2.8.1
HADOOP_TAR=/opt/software/hadoop-2.8.1.tar.gz
ETC_PROFILE=/etc/profile# 提示使用方法的函数
usage() {
echo "请将hadoop-2.8.1.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${HADOOP_TAR} ]; then
usage
exit 0
fi# 已经安装过了
if [ -e ${HADOOP_DIR} ]; then
echo "${HADOOP_DIR}路径已经存在,Hadoop已经安装过了,无需再次安装!!!"
exit 0
fi# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
mkdir -p ${INSTALL_PREFIX}
echo "初始化目录:${INSTALL_PREFIX}"
fiif [ ! -e ${HADOOP_DIR} ]; then
mkdir -p ${HADOOP_DIR}
echo "初始化目录:${HADOOP_DIR}"
fi## 解压JDK的tar包
tar -zxvf ${HADOOP_TAR} -C ${INSTALL_PREFIX}## 配置Hadoop
## hadoop-env.sh## core-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/core-site.xml
fs.defaultFS
hdfs://192.168.10.101:9000
hadoop.tmp.dir
${HADOOP_DIR}/hdpdata
hadoop.proxyuser.root.hosts
*
hadoop.proxyuser.root.groups
*
EOF## hdfs-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/hdfs-site.xml
fs.replication
1
dfs.http.address
qianfeng01:50070
dfs.secondary.http.address
qianfeng01:50090
dfs.namenode.name.dir
${HADOOP_DIR}/hdpdata/dfs/name
dfs.datanode.data.dir
${HADOOP_DIR}/hdpdata/dfs/data
dfs.checkpoint.dir
${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
dfs.checkpoint.edits.dir
${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
dfs.permissions
false
EOF
## yarn-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/yarn-site.xml
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.resourcemanager.hostname
qianfeng01
yarn.resourcemanager.address
qianfeng01:8032
yarn.resourcemanager.scheduler.address
qianfeng01:8030
EOF
## mapred-site.xml
mv ${HADOOP_DIR}/etc/hadoop/mapred-site.xml.template ${HADOOP_DIR}/etc/hadoop/mapred-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/mapred-site.xml
mapreduce.framework.name
yarn
mapreduce.jobhistory.address
qianfeng01:10020
mapreduce.jobhistory.webapp.address
qianfeng01:19888
EOF## slaves
cat << EOF > ${HADOOP_DIR}/etc/hadoop/slaves
qianfeng01
EOF## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export HADOOP_HOME=${HADOOP_DIR}
export PATH=$PATH:${HADOOP_DIR}/bin:${HADOOP_DIR}/sbin
EOF## 格式化
${HADOOP_DIR}/bin/hdfs namenode -format## 提示成功
echo "install hadoop successful!!!"
chmod +x ./install_hadoop.sh
./install_hadoop.sh
source /etc/profile
hadoop version
cd /opt/apps/hadoop-2.8.1/etc/hadoop/hadoop-env.sh
就是找到JAVA_HOME
JAVA_HOME=/opt/apps/jdk1.8.0_45
start-all.sh
1.4 KAFKA的安装
安装包:链接:https://pan.baidu.com/s/1abH6Nn4mD9MR7v5r_dJvBg?pwd=qfz8
提取码:qfz8
--来自百度网盘超级会员V2的分享
tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/
vi /etc/profile
export KAFKA_HOME=/opt/apps/kafka_2.11-1.1.1
export PATH=$PATH:$KAFKA_HOME/bin
vi /opt/apps/kafka_2.11-1.1.1/config/server.properties
下面是需要修改的地方
broker.id=1
advertised.listeners=PLAINTEXT://192.168.10.101:9092
log.dirs=/opt/apps/kafka_2.11-1.1.1/log/kafka-logs
zookeeper.connect=qianfeng01:2181/kafka
vi /opt/apps/kafka_2.11-1.1.1/config/producer.properties
bootstrap.servers=qianfeng01:9092
vi /opt/apps/kafka_2.11-1.1.1/config/consumer.properties
bootstrap.servers=qianfeng01:9092
vi /opt/apps/kafka_2.11-1.1.1/config/zookeeper.properties
dataDir=/opt/apps/kafka_2.11-1.1.1/zkData
[root@hadoop kafka_2.11-1.1.1]# zookeeper-server-start.sh -daemon config/zookeeper.properties
zookeeper-shell.sh qianfeng01:2181
[root@hadoop kafka_2.11-1.1.1]# kafka-server-start.sh -daemon config/server.properties
jps
kafka-topics.sh --create --zookeeper qianfeng01:2181/kafka --replication-factor 1 --partitions 2 --topic test
kafka-topics.sh --list --zookeeper qianfeng01:2181/kafka
1.5 elastocsearch的安装
安装包:
链接:https://pan.baidu.com/s/1h3C3kneHlLuZxXg8KKzK2g?pwd=y35e
提取码:y35e
--来自百度网盘超级会员V2的分享
useradd hadoop
passwd hadoop
使用root账号
[root@hadoop software]# tar -zxvf elasticsearch-6.5.3.tar.gz -C /opt/apps/
[root@hadoop elasticsearch-6.5.3]# vi /etc/profile
export ELASTICSEARCH_HOME=/opt/apps/elasticsearch-6.5.3
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin
配置
cd /opt/apps/elasticsearch-6.5.3/config/elasticsearch.yml
就是我电脑主机号是qianfeng01(其实我也很想改,但是电脑映射习惯了,改了映射没用了。为了妥协只能一直用了) 涉及到qianfeng01大家就改成自己的主机号哦
cluster.name: qianfeng01
node.name: qianfeng01
node.master: true
node.data: truepath.data: /opt/apps/elasticsearch-6.5.3/data
path.logs: /opt/apps/elasticsearch-6.5.3/logs
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["qianfeng01"]
给普通用户授权
[root@hadoop apps]# ll
total 16
drwxr-xr-x 8 root root 4096 Dec 7 2018 elasticsearch-6.5.3
drwxrwxr-x 11 500 500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x 8 10 143 4096 Apr 11 2015 jdk1.8.0_45
drwxr-xr-x 9 root root 4096 Apr 24 14:32 kafka_2.11-1.1.1
[root@hadoop apps]# chown -R hadoop:hadoop elasticsearch-6.5.3/
[root@hadoop apps]# ls
elasticsearch-6.5.3 hadoop-2.8.1 jdk1.8.0_45 kafka_2.11-1.1.1
[root@hadoop apps]# ll
total 16
drwxr-xr-x 8 hadoop hadoop 4096 Dec 7 2018 elasticsearch-6.5.3
drwxrwxr-x 11 500 500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x 8 10 143 4096 Apr 11 2015 jdk1.8.0_45
drwxr-xr-x 9 root root 4096 Apr 24 14:32 kafka_2.11-1.1.1##5.2 给与普通用户root借用权限
[root@hadoop apps]# vi /etc/sudoers
## Allow root to run any commands anywhere
root ALL=(ALL) ALL
hadoop ALL=(ALL) ALL
切换普通用户
su hadoop
elasticsearch
我遇到的问题啊
、max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]_林柚晞的博客-CSDN博客
第二次安装还遇到了一个问题
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
修改方法
切换root用户
sysctl -w vm.max_map_count=262144
查看
sysctl -a|grep vm.max_map_count
如果显示
vm.max_map_count = 262144
就是成功了
就把账号切换成root,然后reboot一下系统
系统启动之后
su hadoop
elasticsearch -d
查看是否启动成功
jps
curl -XGET qianfeng01:9200
就是es报错如何修改
就是切换到普通用户,直接敲elasticsearch
如果是没有报错卡住了就是前台进程,就顺利了
如果要访问web端,就必须把虚拟机中的防火墙关闭
我们再搞一搞插件工具
安装包:
链接:https://pan.baidu.com/s/1-3eWVFmar03xc2RPz-rwRQ?pwd=dklo
提取码:dklo
--来自百度网盘超级会员V2的分享
这个安装包随便解压到一个文件夹下面,只要自己能找到就可以。
打开谷歌浏览器(好久没用过谷歌浏览器了,主要是不能下载插件了,所以浏览器过于简陋,连个常用表单都没有)
关掉上面的弹窗
打开谷歌浏览器的时候,看见搜索框旁边的小插件图标就可以看见es的快捷访问啦
就是我这边还有个bug没搞好,集群没连上
出现上面那个集群健康值未连接,是因为服务器里面的防火墙开了。
systemctl stop firewalld.service
查看状态
systemctl status firewalld.service
看一下web端
1.6 redis的安装
安装包:
链接:https://pan.baidu.com/s/1VoBQEeBBovSWQMTAm7T8mw?pwd=teeq
提取码:teeq
--来自百度网盘超级会员V2的分享
[root@hadoop software]# tar -zxvf redis-4.0.11.tar.gz -C /opt/apps/
[root@hadoop redis-4.0.11]# yum -y install gcc-c++
[root@hadoop redis-4.0.11]# make
[root@hadoop redis-4.0.11]# make PREFIX=/opt/apps/redis-4.0.11 install
[root@hadoop redis-4.0.11]# mkdir etc
[root@hadoop redis-4.0.11]# cp redis.conf etc/
[root@hadoop redis-4.0.11]# cd bin/
[root@hadoop bin]# cp redis-benchmark redis-server redis-cli /usr/bin/
搞配置
vi /opt/apps/redis-4.0.11/etc/redis.conf
注意都是解除注释,而不是全删啊。
daemonize yes
loglevel verboserequirepass root
bind 192.168.10.101
就是如果 requirepass 修改会连不到redis服务器,那就注释掉密码试一试。密码我都不想配置了,实在麻烦
[root@hadoop ~]# redis-server /opt/apps/redis-4.0.11/etc/redis.conf
[root@hadoop ~]# ps -ef | grep redis
root 28066 1 0 16:07 ? 00:00:00 redis-server 192.168.10.101:6379
[root@hadoop ~]# redis-cli -h 192.168.10.101
192.168.10.101:6379> SHUTDOWN
(error) NOAUTH Authentication required.
192.168.10.101:6379> AUTH root
OK
192.168.10.101:6379> SHUTDOWN
not connected> exit
[root@hadoop ~]# ps -ef | grep redis
root 28835 1936 0 16:09 pts/0 00:00:00 grep --color=auto redis
redis的开机脚本
[root@hadoop redis-4.0.11]# vi redis
#!/bin/bash
#chkconfig: 2345 80 90
PATH=/usr/bin:/usr/local/bin:/sbin:/bin
REDIS_PORT=6379
EXEC=/opt/apps/redis-4.0.11/bin/redis-server
REDIS_CLI=/opt/apps/redis-4.0.11/bin/redis-cliPIDFILE=/var/run/redis.pid
CONF=/opt/apps/redis-4.0.11/etc/redis.confcase "$1" in
start)
if [ -f ${PIDFILE} ];then
echo "${PIDFILE} exists, process is already running or crashed"
else
echo "Starting Redis Server ...."
${EXEC} ${CONF}
fi
if [ "$?" = "0" ];then
echo "Redis is running"
fi
;;
stop)
if [ ! -e ${PIDFILE} ];then
echo "${PIDFILE} does not exists, process is not running"
else
PID=$(cat ${PIDFILE})
echo "Stopping..."
${REDIS_CLI} -p ${REDIS_PORT} AUTH root
${REDIS_CLI} -p ${REDIS_PORT} SHUTDOWN
while [ -x ${PIDFILE} ]
do
echo "Waiting for Redis shutdown..."
sleep 1
done
echo "Redis stoped"
fi
;;
restart|force-reload)
${0} stop
${0} start
;;
*)
echo "Usage: /etc/init.d/redis {start|stop|restart|force-reload}" >&2
exit 1
esac
[root@hadoop redis-4.0.11]# cp redis /etc/init.d/
[root@hadoop redis-4.0.11]# chmod +x /etc/init.d/redis
##3. 查看自启动的所有程序
[root@hadoop redis-4.0.11]# chkconfig --list
##4. 将redis添加到自启动列表
[root@hadoop redis-4.0.11]# chkconfig --add redis
[root@hadoop redis-4.0.11]# chkconfig --level 2345 redis on##5. 启动关闭测试
[root@hadoop redis-4.0.11]# systemctl start redis
[root@hadoop redis-4.0.11]# service redis start
1.7 flink的安装
安装包:
链接:https://pan.baidu.com/s/1d6tMAwfLuC47TgAsClnY3w?pwd=k1h9
提取码:k1h9
--来自百度网盘超级会员V2的分享
安装standalone
##1. 解压
[root@hadoop software]# tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /opt/apps/##2. 配置环境变量
[root@hadoop flink-1.9.1]# vi /etc/profile
export FLINK_HOME=/opt/apps/flink-1.9.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin:$FLINK_HOME/bin[root@hadoop flink-1.9.1]# source /etc/profile
配置
[root@hadoop flink-1.9.1]# vi conf/flink-conf.yaml
jobmanager.rpc.address: qianfeng01
jobmanager.rpc.port: 6123
rest.port: 8081
rest.address: qianfeng01
##4. 配置从机文件
[root@hadoop flink-1.9.1]# vi conf/slaves
qianfeng01
##5. 配置主机文件
[root@hadoop flink-1.9.1]# vi conf/masters
qianfeng01:8081
##6. 先启动hdfs
[root@hadoop flink-1.9.1]# start-dfs.sh
[root@hadoop flink-1.9.1]# start-cluster.sh
web端访问:
http://192.168.10.101:8081/
测试,可以不测试
##8. 测试Flink
##8.1 A窗口
[root@hadoop flink-1.9.1]# yum -y install nc
[root@hadoop flink-1.9.1]# nc -l 8000111
222
33
444
555
33
111
222##8.2 B窗口
[root@hadoop flink-1.9.1]# flink run examples/streaming/SocketWindowWordCount.jar --port 8000[root@hadoop flink-1.9.1]# tail -f log/flink-root-taskexecutor-0-hadoop.out
111 : 1
555 : 1
444 : 1
33 : 1
222 : 1
33 : 1
222 : 1
111 : 1##9. 提交jar
[root@hadoop flink-1.9.1]# flink run examples/batch/WordCount.jar --input /etc/profile --output /home/output/00
Starting execution of program
Program execution finished
Job with JobID ca414928ddcfd57969326827f5cb7205 has finished.
Job Runtime: 21670 ms
[root@hadoop flink-1.9.1]# tail -f /home/output/00
want 1
we 1
what 1
wide 1
will 1
workaround 1
x 1
you 3
your 1
z 1
1.8 安装mysql
安装包:
链接:https://pan.baidu.com/s/1qi2i9r7loMj3Y04LsMTc6w?pwd=4u1e
提取码:4u1e
--来自百度网盘超级会员V2的分享
##1. 安装引导
[root@hadoop software]# yum -y localinstall mysql-community-release-el6-5.noarch.rpm
##2. 安装mysql
[root@hadoop software]# yum -y install mysql-server
##3. 启动mysql的服务
[root@hadoop software]# service mysqld start
##4. 设置初始root账户密码
[root@hadoop software]# mysqladmin -uroot password '123456'
##5. 登陆mysql服务
[root@hadoop software]# mysql -uroot -p123456
##6. 远程授权
mysql> grant all privileges on *.* to root@"%" identified by "123456" with grant option;
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
##7. 就是可以使用mysql的client去远程链接mysql的服务了
2 数据介绍
2.1 平台功能
| 功能模块 | 说明 |
|---|---|
| 核心业务 | 基于核心业务数据(包括旅游订单、酒店住宿、车票业务)的相关实时计算、实时展示 |
| 用户行为日志 | 基于产品设计的各种埋点 |
| 风控报警 | 用户异常行为进行监控和报警 |
本项目以大数据实时平台为目标
| 技术方向 | 说明 |
|---|---|
| 实时的ETL | 实时的清洗、转换为规范的格式化的数据 |
| 实时的数据统计 | 实现各种实时的统计指标 |
| 实时的数据存储 | 实时数据落地持久化、交互搜索、动态计算提供技术支持 |
| 规则处理 | 主要是服务实时风控、报警相关需求 |
| 交互式查询 | 交互式查询明细数据或实时的聚合数据(Clickhouse、Apache Druid) |
| 实时数据展示 | 主要是服务于数据使用方,提供更直观的数据展示形式 |
2.2 数据的流程描述
实时的旅游平台,说明数据一定会经过消息中间件(消息队列/消息通道)。原始的数据源通常都是通过消息通道来完成数据的采集。随后根据实际情况将消息通道中的数据存储在各种不同的分布式存储介质中。最终统计存储在分布式介质中的数据,也可以做成BI类型可视化展示。
2.3 数据组成
2.3.1 事实数据
2.3.1.1 旅游订单数据
{
"userID":"1986",
"user_mobile":"15549477595",
"product_id":"12312312",
"product_traffic":"01",
"product_traffic_grade":"11",
"product_traffic_type" : "1",
"product_pub" : "sdfsfsdfsdff | sdfsdfsdfds",
"user_region" : "12312",
"travel_memeber_adult" : "2",
...
}
userID = 用户唯一编号
user_mobile = 用户手机号码
product_id = 旅游这个产品编号
product_traffic = 旅游的交通选择
product_traffic_grade = 坐席
product_traffic_type = 行程种类
product_pub = 旅游住宿选择
user_region = 所在区域
travel_memeber_adult = 本次旅游的成年人的人数
travel_memeber_younger = 本次旅游的儿童的人数
travel_member_baby = 本次旅游的婴儿的人数
product_price = 产品原价格
has_activity = 0表示五活动价格 | 0.8表示打八折
product_fee = 产品价格
order_id = 旅游订单
order_ct = 下单时间
2.3.1.2 用户行为数据
行为类型:action: 'launch(启动) | interactive(交互)| page_enter(页面曝光:其实就是产品页展示)'
事件类型:eventType: 'view(浏览) | slide(滑动)| click:评论、点赞、分享(点击) '
用户ID:userID(sdfsfasdf1231)
app端的手机设备号:userDevice
操作系统:os
手机制造商:manufacturer
运营商:carrier
网络类型:networkType
所在区域:userRegion
所在区域IP : userRegionIP
经度:longitude
纬度:latitude
扩展信息:exts
事件发生时间:ct
{
"os":"1",
"longitude":"115.12321",
"latitude":"26.88282",
"userRegion":"12312",
"userID":"12312",
"manufacturer":"09",
"userRegionIP":"10.206.0.4",
"action":"08",
"eventType":"01"
...
}
2.3.1.3维度数据
-
旅游产品维度表
CREATE TABLE travel.dim_product(
product_id text NULL COMMENT '旅游产品的编号',
product_level int(11) DEFAULT 0 COMMENT '旅游产品的级别',
product_type text NULL COMMENT '旅游产品的类型',
departure_code text NULL COMMENT '旅游产品的出发地编码',
des_city_code text NULL COMMENT '旅游产品的目的地编码',
tourim_ticket_type text NULL COMMENT '旅游产品的订单类型'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
实时的旅游平台,说明数据一定会经过消息中间件(消息队列/消息通道)。原始的数据源通常都是通过消息通道来完成数据的采集。随后根据实际情况将消息通道中的数据存储在各种不同的分布式存储介质中。最终统计存储在分布式介质中的数据,也可以做成BI类型可视化展示。
2.3.1 事实数据
2.3.1.1 旅游订单数据
{
"userID":"1986",
"user_mobile":"15549477595",
"product_id":"12312312",
"product_traffic":"01",
"product_traffic_grade":"11",
"product_traffic_type" : "1",
"product_pub" : "sdfsfsdfsdff | sdfsdfsdfds",
"user_region" : "12312",
"travel_memeber_adult" : "2",
...
}
userID = 用户唯一编号
user_mobile = 用户手机号码
product_id = 旅游这个产品编号
product_traffic = 旅游的交通选择
product_traffic_grade = 坐席
product_traffic_type = 行程种类
product_pub = 旅游住宿选择
user_region = 所在区域
travel_memeber_adult = 本次旅游的成年人的人数
travel_memeber_younger = 本次旅游的儿童的人数
travel_member_baby = 本次旅游的婴儿的人数
product_price = 产品原价格
has_activity = 0表示五活动价格 | 0.8表示打八折
product_fee = 产品价格
order_id = 旅游订单
order_ct = 下单时间
2.3.1.2 用户行为数据
行为类型:action: 'launch(启动) | interactive(交互)| page_enter(页面曝光:其实就是产品页展示)'
事件类型:eventType: 'view(浏览) | slide(滑动)| click:评论、点赞、分享(点击) '
用户ID:userID(sdfsfasdf1231)
app端的手机设备号:userDevice
操作系统:os
手机制造商:manufacturer
运营商:carrier
网络类型:networkType
所在区域:userRegion
所在区域IP : userRegionIP
经度:longitude
纬度:latitude
扩展信息:exts
事件发生时间:ct
{
"os":"1",
"longitude":"115.12321",
"latitude":"26.88282",
"userRegion":"12312",
"userID":"12312",
"manufacturer":"09",
"userRegionIP":"10.206.0.4",
"action":"08",
"eventType":"01"
...
}
2.3.1.3维度数据
-
旅游产品维度表
CREATE TABLE travel.dim_product(
product_id text NULL COMMENT '旅游产品的编号',
product_level int(11) DEFAULT 0 COMMENT '旅游产品的级别',
product_type text NULL COMMENT '旅游产品的类型',
departure_code text NULL COMMENT '旅游产品的出发地编码',
des_city_code text NULL COMMENT '旅游产品的目的地编码',
tourim_ticket_type text NULL COMMENT '旅游产品的订单类型'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
{
"userID":"1986",
"user_mobile":"15549477595",
"product_id":"12312312",
"product_traffic":"01",
"product_traffic_grade":"11",
"product_traffic_type" : "1",
"product_pub" : "sdfsfsdfsdff | sdfsdfsdfds",
"user_region" : "12312",
"travel_memeber_adult" : "2",
...
}
userID = 用户唯一编号
user_mobile = 用户手机号码
product_id = 旅游这个产品编号
product_traffic = 旅游的交通选择
product_traffic_grade = 坐席
product_traffic_type = 行程种类
product_pub = 旅游住宿选择
user_region = 所在区域
travel_memeber_adult = 本次旅游的成年人的人数
travel_memeber_younger = 本次旅游的儿童的人数
travel_member_baby = 本次旅游的婴儿的人数
product_price = 产品原价格
has_activity = 0表示五活动价格 | 0.8表示打八折
product_fee = 产品价格
order_id = 旅游订单
order_ct = 下单时间
2.3.1.2 用户行为数据
行为类型:action: 'launch(启动) | interactive(交互)| page_enter(页面曝光:其实就是产品页展示)'
事件类型:eventType: 'view(浏览) | slide(滑动)| click:评论、点赞、分享(点击) '
用户ID:userID(sdfsfasdf1231)
app端的手机设备号:userDevice
操作系统:os
手机制造商:manufacturer
运营商:carrier
网络类型:networkType
所在区域:userRegion
所在区域IP : userRegionIP
经度:longitude
纬度:latitude
扩展信息:exts
事件发生时间:ct
{
"os":"1",
"longitude":"115.12321",
"latitude":"26.88282",
"userRegion":"12312",
"userID":"12312",
"manufacturer":"09",
"userRegionIP":"10.206.0.4",
"action":"08",
"eventType":"01"
...
}
2.3.1.3维度数据
-
旅游产品维度表
CREATE TABLE travel.dim_product(
product_id text NULL COMMENT '旅游产品的编号',
product_level int(11) DEFAULT 0 COMMENT '旅游产品的级别',
product_type text NULL COMMENT '旅游产品的类型',
departure_code text NULL COMMENT '旅游产品的出发地编码',
des_city_code text NULL COMMENT '旅游产品的目的地编码',
tourim_ticket_type text NULL COMMENT '旅游产品的订单类型'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
行为类型:action: 'launch(启动) | interactive(交互)| page_enter(页面曝光:其实就是产品页展示)'
事件类型:eventType: 'view(浏览) | slide(滑动)| click:评论、点赞、分享(点击) '
用户ID:userID(sdfsfasdf1231)
app端的手机设备号:userDevice
操作系统:os
手机制造商:manufacturer
运营商:carrier
网络类型:networkType
所在区域:userRegion
所在区域IP : userRegionIP
经度:longitude
纬度:latitude
扩展信息:exts
事件发生时间:ct
{
"os":"1",
"longitude":"115.12321",
"latitude":"26.88282",
"userRegion":"12312",
"userID":"12312",
"manufacturer":"09",
"userRegionIP":"10.206.0.4",
"action":"08",
"eventType":"01"
...
}
-
旅游产品维度表
CREATE TABLE travel.dim_product(
product_id text NULL COMMENT '旅游产品的编号',
product_level int(11) DEFAULT 0 COMMENT '旅游产品的级别',
product_type text NULL COMMENT '旅游产品的类型',
departure_code text NULL COMMENT '旅游产品的出发地编码',
des_city_code text NULL COMMENT '旅游产品的目的地编码',
tourim_ticket_type text NULL COMMENT '旅游产品的订单类型'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
酒店维度表
CREATE TABLE travel.dim_pub(
pub_id text NULL COMMENT '酒店的编号',
pub_name text NULL COMMENT '酒店的名称',
pub_stat text NULL COMMENT '酒店的星级',
pub_grade text NULL COMMENT '酒店的等级编码',
pub_grade_desc text NULL COMMENT '酒店的等级编码描述',
pub_area_code text NULL COMMENT '酒店所在区域编码',
pub_address text NULL COMMENT '酒店地址',
is_national text NULL COMMENT '是否属于境内',
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
导入维度数据到mysql
sql文件(因为文件比较大,然后需要自己下载到本地,使用navicat远程连接服务器上的mysql)
链接:https://pan.baidu.com/s/1dfCxtrmgdCNnFeCfM_cDdQ?pwd=btl3
提取码:btl3
--来自百度网盘超级会员V2的分享
打开一下服务器
打开windows上面的navicat
[root@qianfeng01 ~]# mysql -uroot -p123456
下面就开始api开发吧
3 造数
打开idea,创建一个maven项目
3.1 资源准备
3.1.0pom.xml
4.0.0
com.qf.bigdata
qfdata
1.0
UTF-8
UTF-8
1.8
1.1.8
1.7.22
1.2.16
1.2.29
2.4.0
2.8.5
2.9.3
21.0
5.1.44
3.5.1
3.4
2.4
1.6
1.8.1
1.8.1
2.8.1
0.11
1.1.1
3.0.3
compile
org.slf4j
slf4j-api
${slf4j.version}
${scope}
log4j
log4j
${log4j.version}
${scope}
org.slf4j
jcl-over-slf4j
${slf4j.version}
${scope}
ch.qos.logback
logback-classic
${logback.version}
${scope}
ch.qos.logback
logback-core
${logback.version}
${scope}
ch.qos.logback
logback-access
${logback.version}
${scope}
com.google.guava
guava
${guava.version}
${scope}
com.alibaba
fastjson
${fastjson.version}
${scope}
com.google.code.gson
gson
${gson.version}
com.jayway.jsonpath
json-path
${json-path.version}
${scope}
mysql
mysql-connector-java
${mysql.jdbc.version}
${scope}
com.google.protobuf
protobuf-java
${protobuf.version}
${scope}
org.apache.commons
commons-csv
${commons-csv.version}
${scope}
org.apache.commons
commons-lang3
${commons-lang3.version}
${scope}
commons-io
commons-io
${commons-io.version}
${scope}
org.apache.hadoop
hadoop-common
${hadoop.version}
${scope}
protobuf-java
com.google.protobuf
org.apache.hadoop
hadoop-client
${hadoop.version}
${scope}
jsp-api
javax.servlet.jsp:
org.slf4j
log4j-over-slf4j
org.slf4j
slf4j-log4j12
org.apache.hadoop
hadoop-hdfs
${hadoop.version}
${scope}
com.101tec
zkclient
${zk.client.version}
${scope}
org.apache.kafka
kafka_2.11
${kafka.version}
${scope}
org.apache.kafka
kafka-clients
${kafka.version}
${scope}
com.esotericsoftware
kryo
${kryo.version}
${scope}
org.apache.avro
avro
${avro.version}
org.apache.parquet
parquet-avro
${parquet.version}
${scope}
org.apache.parquet
parquet-common
${parquet.version}
${scope}
org.apache.parquet
parquet-tools
${parquet.version}
${scope}
org.apache.parquet
parquet-hadoop-bundle
${parquet.version}
${scope}
dev
true
${project.basedir}/src/main/resources
true
org.apache.maven.plugins
maven-shade-plugin
target/classes
target/test-classes
org.apache.maven.plugins
maven-shade-plugin
3.1.0
TravelCurLogJob
package
shade
true
TravelCurLogJob
*:*
META-INF
public static QParameterTool fromArgs(String[] args) {
final Map map = new HashMap<>(args.length / 2);
int i = 0;
while (i < args.length) {
final String key;
if (args[i].startsWith("--")) {
key = args[i].substring(2);
} else if (args[i].startsWith("-")) {
key = args[i].substring(1);
} else {
throw new IllegalArgumentException(
String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
Arrays.toString(args), args[i]));
}
if (key.isEmpty()) {
throw new IllegalArgumentException(
"The input " + Arrays.toString(args) + " contains an empty argument");
}
i += 1; // try to find the value
if (i >= args.length) {
map.put(key, NO_VALUE_KEY);
} else if (NumberUtils.isNumber(args[i])) {
map.put(key, args[i]);
i += 1;
} else if (args[i].startsWith("--") || args[i].startsWith("-")) {
// the argument cannot be a negative number because we checked earlier
// -> the next argument is a parameter name
map.put(key, NO_VALUE_KEY);
} else {
map.put(key, args[i]);
i += 1;
}
}
return fromMap(map);
}
public static QParameterTool fromPropertiesFile(String path) throws IOException {
File propertiesFile = new File(path);
return fromPropertiesFile(propertiesFile);
}
public static QParameterTool fromPropertiesFile(File file) throws IOException {
if (!file.exists()) {
throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist");
}
try (FileInputStream fis = new FileInputStream(file)) {
return fromPropertiesFile(fis);
}
}
public static QParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
Properties props = new Properties();
props.load(inputStream);
return fromMap((Map) props);
}
public static QParameterTool fromMap(Map map) {
return new QParameterTool(map);
}
public static QParameterTool fromSystemProperties() {
return fromMap((Map) System.getProperties());
}
// ------------------ QParameterTool ------------------------
public Map data = null;
// data which is only used on the client and does not need to be transmitted
protected transient Map defaultData;
protected transient Set unrequestedParameters;
private QParameterTool(Map data) {
this.data = Collections.unmodifiableMap(new HashMap<>(data));
this.defaultData = new ConcurrentHashMap<>(data.size());
this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
unrequestedParameters.addAll(data.keySet());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
QParameterTool that = (QParameterTool) o;
return Objects.equals(data, that.data) &&
Objects.equals(defaultData, that.defaultData) &&
Objects.equals(unrequestedParameters, that.unrequestedParameters);
}
@Override
public int hashCode() {
return Objects.hash(data, defaultData, unrequestedParameters);
}
public Set getUnrequestedParameters() {
return Collections.unmodifiableSet(unrequestedParameters);
}
// ------------------ Get data from the util ----------------
public int getNumberOfParameters() {
return data.size();
}
public String get(String key) {
addToDefaults(key, null);
unrequestedParameters.remove(key);
return data.get(key);
}
public String getRequired(String key) {
addToDefaults(key, null);
String value = get(key);
if (value == null) {
throw new RuntimeException("No data for required key '" + key + "'");
}
return value;
}
public String get(String key, String defaultValue) {
addToDefaults(key, defaultValue);
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return value;
}
}
public boolean has(String value) {
addToDefaults(value, null);
unrequestedParameters.remove(value);
return data.containsKey(value);
}
// -------------- Integer
public int getInt(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Integer.parseInt(value);
}
public int getInt(String key, int defaultValue) {
addToDefaults(key, Integer.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Integer.parseInt(value);
}
// -------------- LONG
public long getLong(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Long.parseLong(value);
}
public long getLong(String key, long defaultValue) {
addToDefaults(key, Long.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Long.parseLong(value);
}
// -------------- FLOAT
public float getFloat(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Float.valueOf(value);
}
public float getFloat(String key, float defaultValue) {
addToDefaults(key, Float.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Float.valueOf(value);
}
}
// -------------- DOUBLE
public double getDouble(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Double.valueOf(value);
}
public double getDouble(String key, double defaultValue) {
addToDefaults(key, Double.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Double.valueOf(value);
}
}
// -------------- BOOLEAN
public boolean getBoolean(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Boolean.valueOf(value);
}
public boolean getBoolean(String key, boolean defaultValue) {
addToDefaults(key, Boolean.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Boolean.valueOf(value);
}
}
// -------------- SHORT
public short getShort(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Short.valueOf(value);
}
public short getShort(String key, short defaultValue) {
addToDefaults(key, Short.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Short.valueOf(value);
}
}
// -------------- BYTE
public byte getByte(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Byte.valueOf(value);
}
public byte getByte(String key, byte defaultValue) {
addToDefaults(key, Byte.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Byte.valueOf(value);
}
}
// --------------- Internals
protected void addToDefaults(String key, String value) {
final String currentValue = defaultData.get(key);
if (currentValue == null) {
if (value == null) {
value = DEFAULT_UNDEFINED;
}
defaultData.put(key, value);
} else {
// there is already an entry for this key. Check if the value is the undefined
if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) {
// update key with better default value
defaultData.put(key, value);
}
}
}
// ------------------------- Export to different targets -------------------------
public Properties getProperties() {
Properties props = new Properties();
props.putAll(this.data);
return props;
}
public void createPropertiesFile(String pathToFile) throws IOException {
createPropertiesFile(pathToFile, true);
}
public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException {
final File file = new File(pathToFile);
if (file.exists()) {
if (overwrite) {
file.delete();
} else {
throw new RuntimeException("File " + pathToFile + " exists and overwriting is not allowed");
}
}
final Properties defaultProps = new Properties();
defaultProps.putAll(this.defaultData);
try (final OutputStream out = new FileOutputStream(file)) {
defaultProps.store(out, "Default file created by Flink's ParameterUtil.createPropertiesFile()");
}
}
protected Object clone() throws CloneNotSupportedException {
return new QParameterTool(this.data);
}
// ------------------------- Interaction with other ParameterUtils -------------------------
public QParameterTool mergeWith(QParameterTool other) {
final Map resultData = new HashMap<>(data.size() + other.data.size());
resultData.putAll(data);
resultData.putAll(other.data);
final QParameterTool ret = new QParameterTool(resultData);
final HashSet requestedParametersLeft = new HashSet<>(data.keySet());
requestedParametersLeft.removeAll(unrequestedParameters);
final HashSet requestedParametersRight = new HashSet<>(other.data.keySet());
requestedParametersRight.removeAll(other.unrequestedParameters);
ret.unrequestedParameters.removeAll(requestedParametersLeft);
ret.unrequestedParameters.removeAll(requestedParametersRight);
return ret;
}
// ------------------------- ExecutionConfig.UserConfig interface -------------------------
public Map toMap() {
return data;
}
// ------------------------- Serialization ---------------------------------------------
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
defaultData = new ConcurrentHashMap<>(data.size());
unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
}
}
package com.qf.bigdata.realtime.util;
import com.qf.bigdata.realtime.constant.CommonConstant;
import com.qf.bigdata.realtime.dvo.RegionDo;
import com.qf.bigdata.realtime.util.CSVUtil;
import com.qf.bigdata.realtime.util.QParameterTool;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.LdapGroupsMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Int;
import java.io.Serializable;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TravelOrderHelper implements Serializable {
//日志管理工具
private static final Logger log = LoggerFactory.getLogger(TravelOrderHelper.class);
public static final String KEY_KAFKA_ID = "KAFKA_ID";
public static final String KEY_ORDER_ID = "order_id";
public static final String KEY_USER_ID = "userid";
public static final String KEY_USER_MOBILE = "user_mobile";
public static final String KEY_PRODUCT_ID = "product_id";
public static final String KEY_PRODUCT_TRAFFIC = "product_traffic";
public static final String KEY_PRODUCT_TRAFFIC_GRADE = "product_traffic_grade";
public static final String KEY_PRODUCT_TRAFFIC_TYPE = "product_traffic_type";
public static final String KEY_PRODUCT_PUB = "product_pub";
public static final String KEY_USER_REGION = "user_region";
public static final String KEY_TRAVEL_MEMBER_ADULT = "travel_member_adult";
public static final String KEY_TRAVEL_MEMBER_YOUNGER = "travel_member_younger";
public static final String KEY_TRAVEL_MEMBER_BABY = "travel_member_baby";
public static final String KEY_PRODUCT_PRICE = "product_price";
public static final String KEY_HAS_ACTIVITY = "has_activity";
public static final String KEY_PRODUCT_FEE = "product_fee";
public static final String KEY_ORDER_CT = "order_ct";
// 地区
public static final String REGION_KEY_CODE = "region_code";
public static final String REGION_KEY_CODE_DESC = "region_code_desc";
public static final String REGION_KEY_CITY = "region_city";
public static final String REGION_KEY_CITY_DESC = "region_city_desc";
public static final String REGION_KEY_PROVINCE = "region_province";
public static final String REGION_KEY_PROVINCE_DESC = "region_province_desc";
// 地区列表,里面存放了所有的地区
public static final List regions = new ArrayList<>();
// 酒店
public static final String PUB_KEY_ID = "pub_id";
public static final String PUB_KEY_NAME = "pub_name";
public static final String PUB_KEY_STAT = "pub_stat";
public static final String PUB_KEY_GRADE = "pub_grade";
public static final String PUB_KEY_GRADE_DESC = "pub_grade_desc";
public static final String PUB_KEY_AREA_CODE = "pub_area_code";
public static final String PUB_KEY_ADDRESS = "pub_address";
public static final String PUB_KEY_IS_NATIONAL = "is_national";
// 酒店与旅游产品的映射集合
public static final Map proMappingPub = new HashMap<>();
//产品
public static final String PRODUCT_KEY_ID = "product_id";
public static final String SHOP_KEY_ID = "shop_id";
// 产品的信息列表
public static final List products = new ArrayList<>();
//参数
public static final String KEY_TOPIC = "topic";
public static final String KEY_SOURCE = "source";
public static final String KEY_BEGIN = "begin";
public static final String KEY_END = "end";
public static final String KEY_COUNT = "count";
public static final String KEY_SLEEP = "sleep";
public static final String SOURCE_PRODUCT = "product";
public static final String SOURCE_PAY = "pay";
public static final String TIME_ORDER = "time_order";
public static final Integer TIME_RANGE_MIN = 1;
public static final Integer TIME_RANGE_MAX = 120;
public static final Integer COUNT_MIN = 1;
public static final Integer COUNT_MAX = 10000;
public static final Integer SLEEP_MIN = 1;
public static final Integer SLEEP_MAX = 3600 * 1000;
public static void main(String[] args) {
//校验这个参数是否合法
String checkResult = checkParams(args);
//校验结果
if (StringUtils.isEmpty(checkResult)){
//2.1执行if说明参数无问题,效果是连续不断地创作数据,然后将数据生产到指定的kafka主题
chooseFun(args);
}else {
System.err.println("some erros happends ["+checkResult+"]");
}
}
//校验参数是否合法
//如果这个方法返回的是空字符串,说明参数没有问题
private static String checkParams(String[] args) {
//定义一个返回结果的字符串
String result = "";
//通过加载args参数列表获取参数工具类对象
QParameterTool tools = QParameterTool.fromArgs(args);
//校验主题
String topic = tools.get(KEY_TOPIC);
if(StringUtils.isEmpty(topic)){
result = "agrs is Empty";
return result;
}
//其他参数
String source = tools.get(KEY_SOURCE);
if(!SOURCE_PRODUCT.equalsIgnoreCase(source) && !SOURCE_PAY.equalsIgnoreCase(source)){
result = "source is error .source must be ['product' or 'pay']";
return result;
}
Integer count = tools.getInt(KEY_COUNT);
if (null == count){
result = "count is empty";
return result;
}else{
if (count > COUNT_MAX || count < COUNT_MIN){
result = "count is unbound[" + COUNT_MIN + "~"+ COUNT_MAX +"]";
return result;
}
}
Integer sleep = tools.getInt(KEY_SLEEP);
if( null == sleep){
result = " sleep is empty";
return result;
} else{
if (sleep > SLEEP_MAX || sleep < SLEEP_MIN){
result = "sleep is unbound[" + SLEEP_MIN + "~"+ SLEEP_MAX +"]";
return result;
}
}
return result;
}
//选择造数
public static void chooseFun(String[] args){
//通过加载args参数列表获取参数工具类对象
QParameterTool tools = QParameterTool.fromArgs(args);
String topic = tools.get(KEY_TOPIC);
String source = tools.get(KEY_SOURCE);
Integer count = tools.getInt(KEY_COUNT);
Integer sleep = tools.getInt(KEY_SLEEP);
if (SOURCE_PRODUCT.equalsIgnoreCase(source)){
//创建关于产品的数据
makeTravelProductData(topic,count,sleep);
}else if(SOURCE_PAY.equalsIgnoreCase(source)){
}
}
//创造产品订单数据
public static void makeTravelProductData(String topic, Integer count, Integer sleep) {
//发送序列化对象
String dateFormatter = CommonConstant.FORMATTER_YYYYMMDDHHMMDD;
String dayFormatter = CommonConstant.FORMATTER_YYYYMMDD;
ChronoUnit chronoUnit = ChronoUnit.MINUTES;
ChronoUnit daysChronoUnit = ChronoUnit.DAYS;
//辅助数据
//2.1地区信息
Listregions = getRegions();
//2.2 酒店信息
Map pubs = getPubMappingPro();
//2.3产品信息
List productIDs = getProducts();
}
//旅游产品信息
private static List getProducts() {
if (CollectionUtils.isEmpty(products)){
try{
List
4.0.0 com.qf.bigdata qfdata1.0 UTF-8 UTF-8 1.8 1.1.8 1.7.22 1.2.16 1.2.29 2.4.0 2.8.5 2.9.3 21.0 5.1.44 3.5.1 3.4 2.4 1.6 1.8.1 1.8.12.8.1 0.11 1.1.1 3.0.3 compile org.slf4j slf4j-api${slf4j.version} ${scope} log4j log4j${log4j.version} ${scope} org.slf4j jcl-over-slf4j${slf4j.version} ${scope} ch.qos.logback logback-classic${logback.version} ${scope} ch.qos.logback logback-core${logback.version} ${scope} ch.qos.logback logback-access${logback.version} ${scope} com.google.guava guava${guava.version} ${scope} com.alibaba fastjson${fastjson.version} ${scope} com.google.code.gson gson${gson.version} com.jayway.jsonpath json-path${json-path.version} ${scope} mysql mysql-connector-java${mysql.jdbc.version} ${scope} com.google.protobuf protobuf-java${protobuf.version} ${scope} org.apache.commons commons-csv${commons-csv.version} ${scope} org.apache.commons commons-lang3${commons-lang3.version} ${scope} commons-io commons-io${commons-io.version} ${scope} org.apache.hadoop hadoop-common${hadoop.version} ${scope} protobuf-java com.google.protobuf org.apache.hadoop hadoop-client${hadoop.version} ${scope} jsp-api javax.servlet.jsp: org.slf4j log4j-over-slf4jorg.slf4j slf4j-log4j12org.apache.hadoop hadoop-hdfs${hadoop.version} ${scope} com.101tec zkclient${zk.client.version} ${scope} org.apache.kafka kafka_2.11${kafka.version} ${scope} org.apache.kafka kafka-clients${kafka.version} ${scope} com.esotericsoftware kryo${kryo.version} ${scope} org.apache.avro avro${avro.version} org.apache.parquet parquet-avro${parquet.version} ${scope} org.apache.parquet parquet-common${parquet.version} ${scope} org.apache.parquet parquet-tools${parquet.version} ${scope} org.apache.parquet parquet-hadoop-bundle${parquet.version} ${scope} dev true${project.basedir}/src/main/resources true org.apache.maven.plugins maven-shade-plugintarget/classes target/test-classes org.apache.maven.plugins maven-shade-plugin3.1.0 TravelCurLogJob package shade true TravelCurLogJob *:* META-INF public static QParameterTool fromArgs(String[] args) { final Map map = new HashMap<>(args.length / 2); int i = 0; while (i < args.length) { final String key; if (args[i].startsWith("--")) { key = args[i].substring(2); } else if (args[i].startsWith("-")) { key = args[i].substring(1); } else { throw new IllegalArgumentException( String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.", Arrays.toString(args), args[i])); } if (key.isEmpty()) { throw new IllegalArgumentException( "The input " + Arrays.toString(args) + " contains an empty argument"); } i += 1; // try to find the value if (i >= args.length) { map.put(key, NO_VALUE_KEY); } else if (NumberUtils.isNumber(args[i])) { map.put(key, args[i]); i += 1; } else if (args[i].startsWith("--") || args[i].startsWith("-")) { // the argument cannot be a negative number because we checked earlier // -> the next argument is a parameter name map.put(key, NO_VALUE_KEY); } else { map.put(key, args[i]); i += 1; } } return fromMap(map); } public static QParameterTool fromPropertiesFile(String path) throws IOException { File propertiesFile = new File(path); return fromPropertiesFile(propertiesFile); } public static QParameterTool fromPropertiesFile(File file) throws IOException { if (!file.exists()) { throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist"); } try (FileInputStream fis = new FileInputStream(file)) { return fromPropertiesFile(fis); } } public static QParameterTool fromPropertiesFile(InputStream inputStream) throws IOException { Properties props = new Properties(); props.load(inputStream); return fromMap((Map) props); } public static QParameterTool fromMap(Map map) { return new QParameterTool(map); } public static QParameterTool fromSystemProperties() { return fromMap((Map) System.getProperties()); } // ------------------ QParameterTool ------------------------ public Map data = null; // data which is only used on the client and does not need to be transmitted protected transient Map defaultData; protected transient Set unrequestedParameters; private QParameterTool(Map data) { this.data = Collections.unmodifiableMap(new HashMap<>(data)); this.defaultData = new ConcurrentHashMap<>(data.size()); this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); unrequestedParameters.addAll(data.keySet()); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } QParameterTool that = (QParameterTool) o; return Objects.equals(data, that.data) && Objects.equals(defaultData, that.defaultData) && Objects.equals(unrequestedParameters, that.unrequestedParameters); } @Override public int hashCode() { return Objects.hash(data, defaultData, unrequestedParameters); } public Set getUnrequestedParameters() { return Collections.unmodifiableSet(unrequestedParameters); } // ------------------ Get data from the util ---------------- public int getNumberOfParameters() { return data.size(); } public String get(String key) { addToDefaults(key, null); unrequestedParameters.remove(key); return data.get(key); } public String getRequired(String key) { addToDefaults(key, null); String value = get(key); if (value == null) { throw new RuntimeException("No data for required key '" + key + "'"); } return value; } public String get(String key, String defaultValue) { addToDefaults(key, defaultValue); String value = get(key); if (value == null) { return defaultValue; } else { return value; } } public boolean has(String value) { addToDefaults(value, null); unrequestedParameters.remove(value); return data.containsKey(value); } // -------------- Integer public int getInt(String key) { addToDefaults(key, null); String value = getRequired(key); return Integer.parseInt(value); } public int getInt(String key, int defaultValue) { addToDefaults(key, Integer.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } return Integer.parseInt(value); } // -------------- LONG public long getLong(String key) { addToDefaults(key, null); String value = getRequired(key); return Long.parseLong(value); } public long getLong(String key, long defaultValue) { addToDefaults(key, Long.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } return Long.parseLong(value); } // -------------- FLOAT public float getFloat(String key) { addToDefaults(key, null); String value = getRequired(key); return Float.valueOf(value); } public float getFloat(String key, float defaultValue) { addToDefaults(key, Float.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Float.valueOf(value); } } // -------------- DOUBLE public double getDouble(String key) { addToDefaults(key, null); String value = getRequired(key); return Double.valueOf(value); } public double getDouble(String key, double defaultValue) { addToDefaults(key, Double.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Double.valueOf(value); } } // -------------- BOOLEAN public boolean getBoolean(String key) { addToDefaults(key, null); String value = getRequired(key); return Boolean.valueOf(value); } public boolean getBoolean(String key, boolean defaultValue) { addToDefaults(key, Boolean.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Boolean.valueOf(value); } } // -------------- SHORT public short getShort(String key) { addToDefaults(key, null); String value = getRequired(key); return Short.valueOf(value); } public short getShort(String key, short defaultValue) { addToDefaults(key, Short.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Short.valueOf(value); } } // -------------- BYTE public byte getByte(String key) { addToDefaults(key, null); String value = getRequired(key); return Byte.valueOf(value); } public byte getByte(String key, byte defaultValue) { addToDefaults(key, Byte.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Byte.valueOf(value); } } // --------------- Internals protected void addToDefaults(String key, String value) { final String currentValue = defaultData.get(key); if (currentValue == null) { if (value == null) { value = DEFAULT_UNDEFINED; } defaultData.put(key, value); } else { // there is already an entry for this key. Check if the value is the undefined if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) { // update key with better default value defaultData.put(key, value); } } } // ------------------------- Export to different targets ------------------------- public Properties getProperties() { Properties props = new Properties(); props.putAll(this.data); return props; } public void createPropertiesFile(String pathToFile) throws IOException { createPropertiesFile(pathToFile, true); } public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException { final File file = new File(pathToFile); if (file.exists()) { if (overwrite) { file.delete(); } else { throw new RuntimeException("File " + pathToFile + " exists and overwriting is not allowed"); } } final Properties defaultProps = new Properties(); defaultProps.putAll(this.defaultData); try (final OutputStream out = new FileOutputStream(file)) { defaultProps.store(out, "Default file created by Flink's ParameterUtil.createPropertiesFile()"); } } protected Object clone() throws CloneNotSupportedException { return new QParameterTool(this.data); } // ------------------------- Interaction with other ParameterUtils ------------------------- public QParameterTool mergeWith(QParameterTool other) { final Map resultData = new HashMap<>(data.size() + other.data.size()); resultData.putAll(data); resultData.putAll(other.data); final QParameterTool ret = new QParameterTool(resultData); final HashSet requestedParametersLeft = new HashSet<>(data.keySet()); requestedParametersLeft.removeAll(unrequestedParameters); final HashSet requestedParametersRight = new HashSet<>(other.data.keySet()); requestedParametersRight.removeAll(other.unrequestedParameters); ret.unrequestedParameters.removeAll(requestedParametersLeft); ret.unrequestedParameters.removeAll(requestedParametersRight); return ret; } // ------------------------- ExecutionConfig.UserConfig interface ------------------------- public Map toMap() { return data; } // ------------------------- Serialization --------------------------------------------- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); defaultData = new ConcurrentHashMap<>(data.size()); unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); } }
package com.qf.bigdata.realtime.util;
import com.qf.bigdata.realtime.constant.CommonConstant;
import com.qf.bigdata.realtime.dvo.RegionDo;
import com.qf.bigdata.realtime.util.CSVUtil;
import com.qf.bigdata.realtime.util.QParameterTool;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.LdapGroupsMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Int;
import java.io.Serializable;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TravelOrderHelper implements Serializable {
//日志管理工具
private static final Logger log = LoggerFactory.getLogger(TravelOrderHelper.class);
public static final String KEY_KAFKA_ID = "KAFKA_ID";
public static final String KEY_ORDER_ID = "order_id";
public static final String KEY_USER_ID = "userid";
public static final String KEY_USER_MOBILE = "user_mobile";
public static final String KEY_PRODUCT_ID = "product_id";
public static final String KEY_PRODUCT_TRAFFIC = "product_traffic";
public static final String KEY_PRODUCT_TRAFFIC_GRADE = "product_traffic_grade";
public static final String KEY_PRODUCT_TRAFFIC_TYPE = "product_traffic_type";
public static final String KEY_PRODUCT_PUB = "product_pub";
public static final String KEY_USER_REGION = "user_region";
public static final String KEY_TRAVEL_MEMBER_ADULT = "travel_member_adult";
public static final String KEY_TRAVEL_MEMBER_YOUNGER = "travel_member_younger";
public static final String KEY_TRAVEL_MEMBER_BABY = "travel_member_baby";
public static final String KEY_PRODUCT_PRICE = "product_price";
public static final String KEY_HAS_ACTIVITY = "has_activity";
public static final String KEY_PRODUCT_FEE = "product_fee";
public static final String KEY_ORDER_CT = "order_ct";
// 地区
public static final String REGION_KEY_CODE = "region_code";
public static final String REGION_KEY_CODE_DESC = "region_code_desc";
public static final String REGION_KEY_CITY = "region_city";
public static final String REGION_KEY_CITY_DESC = "region_city_desc";
public static final String REGION_KEY_PROVINCE = "region_province";
public static final String REGION_KEY_PROVINCE_DESC = "region_province_desc";
// 地区列表,里面存放了所有的地区
public static final List regions = new ArrayList<>();
// 酒店
public static final String PUB_KEY_ID = "pub_id";
public static final String PUB_KEY_NAME = "pub_name";
public static final String PUB_KEY_STAT = "pub_stat";
public static final String PUB_KEY_GRADE = "pub_grade";
public static final String PUB_KEY_GRADE_DESC = "pub_grade_desc";
public static final String PUB_KEY_AREA_CODE = "pub_area_code";
public static final String PUB_KEY_ADDRESS = "pub_address";
public static final String PUB_KEY_IS_NATIONAL = "is_national";
// 酒店与旅游产品的映射集合
public static final Map proMappingPub = new HashMap<>();
//产品
public static final String PRODUCT_KEY_ID = "product_id";
public static final String SHOP_KEY_ID = "shop_id";
// 产品的信息列表
public static final List products = new ArrayList<>();
//参数
public static final String KEY_TOPIC = "topic";
public static final String KEY_SOURCE = "source";
public static final String KEY_BEGIN = "begin";
public static final String KEY_END = "end";
public static final String KEY_COUNT = "count";
public static final String KEY_SLEEP = "sleep";
public static final String SOURCE_PRODUCT = "product";
public static final String SOURCE_PAY = "pay";
public static final String TIME_ORDER = "time_order";
public static final Integer TIME_RANGE_MIN = 1;
public static final Integer TIME_RANGE_MAX = 120;
public static final Integer COUNT_MIN = 1;
public static final Integer COUNT_MAX = 10000;
public static final Integer SLEEP_MIN = 1;
public static final Integer SLEEP_MAX = 3600 * 1000;
public static void main(String[] args) {
//校验这个参数是否合法
String checkResult = checkParams(args);
//校验结果
if (StringUtils.isEmpty(checkResult)){
//2.1执行if说明参数无问题,效果是连续不断地创作数据,然后将数据生产到指定的kafka主题
chooseFun(args);
}else {
System.err.println("some erros happends ["+checkResult+"]");
}
}
//校验参数是否合法
//如果这个方法返回的是空字符串,说明参数没有问题
private static String checkParams(String[] args) {
//定义一个返回结果的字符串
String result = "";
//通过加载args参数列表获取参数工具类对象
QParameterTool tools = QParameterTool.fromArgs(args);
//校验主题
String topic = tools.get(KEY_TOPIC);
if(StringUtils.isEmpty(topic)){
result = "agrs is Empty";
return result;
}
//其他参数
String source = tools.get(KEY_SOURCE);
if(!SOURCE_PRODUCT.equalsIgnoreCase(source) && !SOURCE_PAY.equalsIgnoreCase(source)){
result = "source is error .source must be ['product' or 'pay']";
return result;
}
Integer count = tools.getInt(KEY_COUNT);
if (null == count){
result = "count is empty";
return result;
}else{
if (count > COUNT_MAX || count < COUNT_MIN){
result = "count is unbound[" + COUNT_MIN + "~"+ COUNT_MAX +"]";
return result;
}
}
Integer sleep = tools.getInt(KEY_SLEEP);
if( null == sleep){
result = " sleep is empty";
return result;
} else{
if (sleep > SLEEP_MAX || sleep < SLEEP_MIN){
result = "sleep is unbound[" + SLEEP_MIN + "~"+ SLEEP_MAX +"]";
return result;
}
}
return result;
}
//选择造数
public static void chooseFun(String[] args){
//通过加载args参数列表获取参数工具类对象
QParameterTool tools = QParameterTool.fromArgs(args);
String topic = tools.get(KEY_TOPIC);
String source = tools.get(KEY_SOURCE);
Integer count = tools.getInt(KEY_COUNT);
Integer sleep = tools.getInt(KEY_SLEEP);
if (SOURCE_PRODUCT.equalsIgnoreCase(source)){
//创建关于产品的数据
makeTravelProductData(topic,count,sleep);
}else if(SOURCE_PAY.equalsIgnoreCase(source)){
}
}
//创造产品订单数据
public static void makeTravelProductData(String topic, Integer count, Integer sleep) {
//发送序列化对象
String dateFormatter = CommonConstant.FORMATTER_YYYYMMDDHHMMDD;
String dayFormatter = CommonConstant.FORMATTER_YYYYMMDD;
ChronoUnit chronoUnit = ChronoUnit.MINUTES;
ChronoUnit daysChronoUnit = ChronoUnit.DAYS;
//辅助数据
//2.1地区信息
Listregions = getRegions();
//2.2 酒店信息
Map pubs = getPubMappingPro();
//2.3产品信息
List productIDs = getProducts();
}
//旅游产品信息
private static List getProducts() {
if (CollectionUtils.isEmpty(products)){
try{
List



