栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

f数--仓

f数--仓

  1. 环境准备

1.1虚拟机准备

克隆三台虚拟机(hadoop101、hadoop102、hadoop103),配置好对应主机的网络IP、主机名称、关闭防火墙。

设置hadoop102、hadoop103、hadoop104的主机对应内存分别是:4G、4G、4G

1.2配置免密登录
  1. 配置ssh免密登录

[root@hadoop101 ~]# vim /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.1.101 hadoop101

192.168.1.102 hadoop102

192.168.1.103 hadoop103

[root@hadoop101 ~]# ssh-keygen -t rsa

[root@hadoop101 ~]# ssh-copy-id hadoop101

[root@hadoop101 ~]# ssh-copy-id hadoop102

[root@hadoop101 ~]# ssh-copy-id hadoop103

其余两台机器同样操作一遍

1.3安装jdk
  1. 卸载linux上原有open jdk,其余两台机器同样操作进行卸载

[root@hadoop101 ~]# rpm -qa | grep jdk

java-1.8.0-openjdk-1.8.0.161-2.b14.el7.x86_64

copy-jdk-configs-3.3-2.el7.noarch

java-1.8.0-openjdk-headless-1.8.0.161-2.b14.el7.x86_64

[root@hadoop101 ~]# rpm -e --nodeps java-1.8.0-openjdk-1.8.0.161-2.b14.el7.x86_64

[root@hadoop101 ~]# rpm -e --nodeps copy-jdk-configs-3.3-2.el7.noarch

[root@hadoop101 ~]# rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.161-2.b14.el7.x86_64

  1. 创建软件包存放目录

[root@hadoop101 ~]# mkdir /opt/software

[root@hadoop101 ~]# cd /opt/software/

(3)上传jdk安装包并进行解压,添加环境变量

[root@hadoop101 software]# mkdir /opt/module

[root@hadoop101 software]# tar -zxvf jdk-8u211-linux-x64.tar.gz -C /opt/module/

/opt/module/jdk1.8.0_211

[root@hadoop101 jdk1.8.0_211]# vim /etc/profile

在profile结尾处加上jdk路径

#JAVA_HOME

export JAVA_HOME=/opt/module/jdk1.8.0_211

export PATH=$PATH:$JAVA_HOME/bin

(4)source下

[root@hadoop101 jdk1.8.0_211]# source /etc/profile

[root@hadoop101 jdk1.8.0_211]# java -version

java version "1.8.0_211"

Java(TM) SE Runtime Environment (build 1.8.0_211-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

[root@hadoop101 jdk1.8.0_211]#

  1. 将module包的jdk路径传输到其余两台机器上,并配置jdk环境变量source下

[root@hadoop101 module]# scp -r /opt/module/jdk1.8.0_211/ hadoop102:/opt/module/

[root@hadoop101 module]# scp -r /opt/module/jdk1.8.0_211/ hadoop103:/opt/module/

[root@hadoop101 module]# scp /etc/profile hadoop102:/etc/

[root@hadoop101 module]# scp /etc/profile hadoop103:/etc/

[root@hadoop102 module]# source /etc/profile

[root@hadoop102 module]# java -version

java version "1.8.0_211"

Java(TM) SE Runtime Environment (build 1.8.0_211-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

[root@hadoop103 ~]# source /etc/profile

[root@hadoop103 ~]# java -version

java version "1.8.0_211"

Java(TM) SE Runtime Environment (build 1.8.0_211-b12)

Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

1.4关闭防火墙

[root@hadoop101 ~]# systemctl stop firewalld.service

[root@hadoop101 ~]# systemctl disable firewalld.service

[root@hadoop102 ~]# systemctl stop firewalld.service

[root@hadoop102 ~]# systemctl disable firewalld.service

[root@hadoop103 ~]# systemctl stop firewalld.service

[root@hadoop103 ~]# systemctl disable firewalld.service

  1. 安装Zookeeper 3.5.7

(1)上传压缩包到software文件夹,并进行解压

[root@hadoop101 module]# cd /opt/software/

[root@hadoop101 software]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz  -C /opt/module/

(2)分发到各节点

[root@hadoop101 software]# cd /opt/module/

[root@hadoop101 module]# scp -r apache-zookeeper-3.5.7-bin/ hadoop102:/opt/module/

[root@hadoop101 module]# scp -r apache-zookeeper-3.5.7-bin/ hadoop103:/opt/module/

  1. 在zookeeper目录创建zkData目录

[root@hadoop101 module]# cd apache-zookeeper-3.5.7-bin/

[root@hadoop101 apache-zookeeper-3.5.7-bin]# mkdir zkData

  1. 在zkData目录下创建myid文件,写上对应比编号1并保存

[root@hadoop101 apache-zookeeper-3.5.7-bin]# cd zkData/

[root@hadoop101 zkData]# vim myid

1

(5)分发zkData目录

[root@hadoop101 zkData]# cd ..

[root@hadoop101 apache-zookeeper-3.5.7-bin]# scp -r zkData/ hadoop102:/opt/module/apache-zookeeper-3.5.7-bin/

[root@hadoop101 apache-zookeeper-3.5.7-bin]# scp -r zkData/ hadoop103:/opt/module/apache-zookeeper-3.5.7-bin/

(6)配置zoo.cfg

[root@hadoop101 apache-zookeeper-3.5.7]# cd conf/

[root@hadoop101 conf]# mv zoo_sample.cfg zoo.cfg

[root@hadoop101 conf]# vim zoo.cfg

修改数据存储路径

dataDir=/opt/module/apache-zookeeper-3.5.7-bin/zkData

在文件末尾处增加集群配置

server.1=hadoop101:2888:3888

server.2=hadoop102:2888:3888

server.3=hadoop103:2888:3888

分发zoo.cfg

[root@hadoop101 conf]# scp zoo.cfg hadoop102:/opt/module/apache-zookeeper-3.5.7-bin/conf/

[root@hadoop101 conf]# scp zoo.cfg hadoop103:/opt/module/apache-zookeeper-3.5.7-bin/conf/

(7)修改其余两台机器的myid,分别为2,3

[root@hadoop102 apache-zookeeper-3.5.7]# vim zkData/myid

2

[root@hadoop103 apache-zookeeper-3.5.7]# vim zkData/myid

3

  1. 启动集群

[root@hadoop101 ~]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

[root@hadoop102~]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

[root@hadoop103 ~]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

  1. 安装Hadoop 3.1.3

3.1HDFS HA搭建
  1. 上传压缩包到software文件夹,并进行解压

[root@hadoop101 module]# cd /opt/software/

[root@hadoop101 software]# tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

  1. 分发opt目录下hadoop文件夹

[root@hadoop101 software]# cd /opt/module/

[root@hadoop101 module]# scp -r hadoop-3.1.3/ hadoop102:/opt/module/

[root@hadoop101 module]# scp -r hadoop-3.1.3/ hadoop103:/opt/module/

(3)配置hadoop环境变量,结尾处加上hadoop路径,其余两台机器同样操作

[root@hadoop101 hadoop-3.1.3]# vim /etc/profile

#HADOOP_HOME

export HADOOP_HOME=/opt/module/hadoop-3.1.3

export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

[root@hadoop101 hadoop-3.1.3]# source /etc/profile

[root@hadoop101 hadoop-3.1.3]# hadoop version

Hadoop 3.1.3

Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r ba631c436b806728f8ec2f54ab1e289526c90579

Compiled by ztang on 2019-09-12T02:47Z

Compiled with protoc 2.5.0

From source with checksum ec785077c385118ac91aadde5ec9799

This command was run using /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3.jar

(4)配置nameservice,编写hdfs-sitx.xml

[root@hadoop101 hadoop-3.1.3]# cd etc/hadoop/

[root@hadoop101 hadoop]# vim hdfs-site.xml

                dfs.replication

                1

  dfs.nameservices

  mycluster

  dfs.ha.namenodes.mycluster

  nn1,nn2,nn3

  dfs.namenode.rpc-address.mycluster.nn1

  hadoop101:8020

  dfs.namenode.rpc-address.mycluster.nn2

  hadoop102:8020

  dfs.namenode.rpc-address.mycluster.nn3

  hadoop103:8020

  dfs.namenode.http-address.mycluster.nn1

  hadoop101:9870

  dfs.namenode.http-address.mycluster.nn2

  hadoop102:9870

  dfs.namenode.http-address.mycluster.nn3

  hadoop103:9870

  dfs.namenode.shared.edits.dir

  qjournal://hadoop101:8485;hadoop102:8485;hadoop103:8485/mycluster

  dfs.client.failover.proxy.provider.mycluster

  org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

 

        

                dfs.ha.fencing.methods

                sshfence

        

        

         dfs.ha.fencing.ssh.private-key-files

        /root/.ssh/id_rsa

        

        

                dfs.permissions.enable

                false

        

  1. 编写core-site.xml

 

 

  fs.defaultFS

  hdfs://mycluster

 

  

  dfs.journalnode.edits.dir

  /opt/module/hadoop-3.1.3/JN/data

  

  

  

  hadoop.tmp.dir

  /opt/module/hadoop-3.1.3/tmp

 

  1. 在hdfs.xml添加故障自动转移

[root@hadoop101 hadoop]# vim hdfs-site.xml

 

   dfs.ha.automatic-failover.enabled

   true

 

  1. 在core-site.xml添加zookeeper地址

 

  

   ha.zookeeper.quorum

   hadoop101:2181,hadoop102:2181,hadoop103:2181

 

3.2ResouceManager HA搭建
  1. 编写yarn-site.xml

[root@hadoop101 hadoop]# vim yarn-site.xml

 

        yarn.nodemanager.aux-services

        mapreduce_shuffle

    

  yarn.resourcemanager.ha.enabled

  true

  yarn.resourcemanager.cluster-id

  cluster1

  yarn.resourcemanager.ha.rm-ids

  rm1,rm2

  yarn.resourcemanager.hostname.rm1

  hadoop101

  yarn.resourcemanager.hostname.rm2

  hadoop103

  yarn.resourcemanager.webapp.address.rm1

  hadoop101:8088

  yarn.resourcemanager.webapp.address.rm2

  hadoop103:8088

  hadoop.zk.address

  hadoop101:2181,hadoop102:2181,hadoop103:2181

 

    

        yarn.resourcemanager.recovery.enabled

        true

 

     yarn.resourcemanager.store.class     

   org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore

3.3启动集群
  1. 配置workers(老版本为slaves)

[root@hadoop101 hadoop]# vim workers

hadoop101

hadoop102

Hadoop103

(2)分发配置文件

[root@hadoop101 hadoop]# cd ..

[root@hadoop101 etc]# scp -r hadoop/ hadoop102:/opt/module/hadoop-3.1.3/etc/

[root@hadoop101 etc]# scp -r hadoop/ hadoop103:/opt/module/hadoop-3.1.3/etc/

(3)在各台机器上启动journalnode服务

[root@hadoop101 hadoop-3.1.3]# sbin/hadoop-daemon.sh start journalnode

[root@hadoop102 hadoop-3.1.3]# sbin/hadoop-daemon.sh start journalnode

[root@hadoop103 hadoop-3.1.3]# sbin/hadoop-daemon.sh start journalnode

(4)在nn1上对namenode进行格式化

[root@hadoop101 hadoop-3.1.3]# bin/hdfs namenode -format

(5)在start-dfs.sh,stop-dfs.sh中配置root用户,顶部配置以下内容

[root@hadoop101 hadoop-3.1.3]# vim sbin/start-dfs.sh

HDFS_DATANODE_USER=root

HADOOP_SECURE_DN_USER=hdfs

HDFS_NAMENODE_USER=root

HDFS_SECONDARYNAMENODE_USER=root

HDFS_JOURNALNODE_USER=root

HDFS_ZKFC_USER=root

[root@hadoop101 hadoop-3.1.3]# vim sbin/stop-dfs.sh

HDFS_DATANODE_USER=root

HADOOP_SECURE_DN_USER=hdfs

HDFS_NAMENODE_USER=root

HDFS_SECONDARYNAMENODE_USER=root

HDFS_JOURNALNODE_USER=root

HDFS_ZKFC_USER=root

(6)在start-yarn.sh,stop-yarn.sh中配置root用户,顶部配置以下内容

[root@hadoop101 hadoop-3.1.3]# vim sbin/start-yarn.sh

YARN_RESOURCEMANAGER_USER=root

HADOOP_SECURE_DN_USER=yarn

YARN_NODEMANAGER_USER=root

[root@hadoop101 hadoop-3.1.3]# vim sbin/stop-yarn.sh

YARN_RESOURCEMANAGER_USER=root

HADOOP_SECURE_DN_USER=yarn

YARN_NODEMANAGER_USER=root

(7)编辑hadoop-env.sh,解开注释,添加JAVA_HOME

[root@hadoop101 hadoop-3.1.3]# vim etc/hadoop/hadoop-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_211

[root@hadoop102 hadoop-3.1.3]# vim etc/hadoop/hadoop-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_211

[root@hadoop103 hadoop-3.1.3]# vim etc/hadoop/hadoop-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_211

(8)分发以上.sh文件

[root@hadoop101 hadoop-3.1.3]# scp -r sbin/ hadoop102:/opt/module/hadoop-3.1.3/

[root@hadoop101 hadoop-3.1.3]# scp -r sbin/ hadoop103:/opt/module/hadoop-3.1.3/

(9)同步,启动nn1的namenode,在 nn2和nn3上进行同步

[root@hadoop101 hadoop-3.1.3]# sbin/hadoop-daemon.sh start namenode

[root@hadoop102 hadoop-3.1.3]# bin/hdfs namenode -bootstrapStandby

[root@hadoop103 hadoop-3.1.3]# bin/hdfs namenode -bootstrapStandby

[root@hadoop102 hadoop-3.1.3]# sbin/hadoop-daemon.sh start namenode

[root@hadoop103 hadoop-3.1.3]# sbin/hadoop-daemon.sh start namenode

(10)关闭所有hdfs服务

[root@hadoop101 hadoop-3.1.3]# sbin/stop-all.sh

(11)初始化HA在Zookeeper中状态:

[root@hadoop101 hadoop-3.1.3]# bin/hdfs zkfc -formatZK

(12)启动集群服务

[root@hadoop101 hadoop-3.1.3]# sbin/start-all.sh

  1. 安装MySql

4.1安装MySql服务端

(1)卸载MySql依赖,虽然机器上没有装MySql,但是这一步不可少

[root@hadoop101 software]# yum remove mysql-libs

  1. 下载依赖并安装

[root@hadoop101 software]# yum install libaio

[root@hadoop101 software]# yum -y install autoconf

[root@hadoop101 software]# wget https://downloads.mysql.com/archives/get/p/23/file/MySQL-shared-compat-5.6.24-1.el6.x86_64.rpm

[root@hadoop101 software]# wget https://downloads.mysql.com/archives/get/p/23/file/MySQL-shared-5.6.24-1.el7.x86_64.rpm

[root@hadoop101 software]# rpm -ivh MySQL-shared-5.6.24-1.el7.x86_64.rpm

[root@hadoop101 software]# rpm -ivh MySQL-shared-compat-5.6.24-1.el6.x86_64.rpm

  1. 上传mysql-libs.zip,并进行解压

[root@hadoop101 software]# yum install lzunzip

[root@hadoop101 software]# unzip mysql-libs.zip

(4)进入到mysql-libs文件夹下

[root@hadoop101 software]# cd mysql-libs/

[root@hadoop101 mysql-libs]# ls

MySQL-client-5.6.24-1.el6.x86_64.rpm  mysql-connector-java-5.1.27.tar.gz  MySQL-server-5.6.24-1.el6.x86_64.rpm

(5)安装MySql服务端

[root@hadoop101 mysql-libs]#  rpm -ivh MySQL-server-5.6.24-1.el6.x86_64.rpm

(6)查看生产的随机密码

[root@hadoop101 mysql-libs]# cat /root/.mysql_secret

# The random password set for the root user at Sun Feb 23 12:01:22 2020 (local time): 8n2FEY8yf4vBMmL

(7)查看MySql服务状态

[root@hadoop101 mysql-libs]# service mysql status

 ERROR! MySQL is not running

  1. 启动MySql

[root@hadoop101 mysql-libs]# service mysql start

Starting MySQL....... SUCCESS!

4.2安装MySql客户端

(1)安装MySql客户端

[root@hadoop101 mysql-libs]# rpm -ivh MySQL-client-5.6.24-1.el6.x86_64.rpm

(2)登录MySql

[root@hadoop101 mysql-libs]# mysql -uroot -p8n2FEY8yf4vBMmLa

(3)修改密码

mysql> SET PASSWORD=PASSWORd('123456');

(4)退出MySql

mysql> exit;

4.3配置User表访问权限

(1)登录MySql,访问库mysql

[root@hadoop101 mysql-libs]# mysql -uroot -p123456

mysql> show databases;

+--------------------+

| Database           |

+--------------------+

| information_schema |

| mysql              |

| performance_schema |

| test               |

+--------------------+

mysql> use mysql

mysql> show tables;

  1. 修改User表

mysql> select User, Host, Password from user;

mysql> update user set host='%' where host='localhost';

  1. 删除root用户其他的host

mysql> delete from user where host!='%';

  1. 刷新

mysql> flush privileges;

  1. 退出

mysql> exit;

  1. 安装Hive 3.1.2
  1. 上传hive压缩包,并进行解压

[root@hadoop101 software]# tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /opt/module/

  1. 拷贝MySql驱动到hive lib下

[root@hadoop101 software]# cd mysql-libs/

[root@hadoop101 mysql-libs]# tar -zxvf mysql-connector-java-5.1.27.tar.gz

[root@hadoop101 mysql-libs]# cd mysql-connector-java-5.1.27/

[root@hadoop101 mysql-connector-java-5.1.27]# cp mysql-connector-java-5.1.27-bin.jar /opt/module/apache-hive-3.1.2-bin/lib/

  1. 配置hive元数据到MySql

[root@hadoop101 mysql-connector-java-5.1.27]# cd /opt/module/apache-hive-3.1.2-bin/conf/

[root@hadoop101 conf]# vim hive-site.xml

  javax.jdo.option.ConnectionURL

  jdbc:mysql://hadoop101:3306/metastore?createDatabaseIfNotExist=true

  JDBC connect string for a JDBC metastore

  javax.jdo.option.ConnectionDriverName

  com.mysql.jdbc.Driver

  Driver class name for a JDBC metastore

  javax.jdo.option.ConnectionUserName

  root

  username to use against metastore database

  javax.jdo.option.ConnectionPassword

  123456

  password to use against metastore database

       

         hive.metastore.warehouse.dir

         /user/hive/warehouse

         location of default database for the warehouse

       

      

hive.cli.print.header

true

hive.cli.print.current.db

true

   

      hive.metastore.schema.verification

      false

   

   

      datanucleus.schema.autoCreateAll

      true

   

  

      hive.metastore.uris

      thrift://hadoop101:9083

        

                 hive.server2.thrift.port

                 10000

        

        

                hive.server2.thrift.bind.host

                hadoop101

        

    hive.metastore.event.db.notification.api.auth

    false

  

    hive.server2.active.passive.ha.enable

    true

  

  1. 配置hive环境变量,在profile结尾处加上以下内容

[root@hadoop101 apache-hive-3.1.2-bin]# vim /etc/profile

#HIVE_HOME

export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin

export PATH=$PATH:$HIVE_HOME/bin

[root@hadoop101 apache-hive-3.1.2-bin]# source /etc/profile

  1. 替换hive中的guava.jar

[root@hadoop101 apache-hive-3.1.2-bin]# cd lib/

[root@hadoop101 lib]# ls |grep guava

guava-19.0.jar

jersey-guava-2.25.1.jar

显示版本好为19.0,再次进入hadoop中查看对应版本

[root@hadoop101 lib]# cd /opt/module/hadoop-3.1.3/share/hadoop/common/lib/

[root@hadoop101 lib]# ls |grep guava

guava-27.0-jre.jar

listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar

版本号为27.0,删除hive原有guava的jar包并将hadoop中的guava-27.0-jre.jar复制过去

[root@hadoop101 lib]# cp guava-27.0-jre.jar /opt/module/apache-hive-3.1.2-bin/lib/

[root@hadoop101 lib]# cd /opt/module/apache-hive-3.1.2-bin/lib/

[root@hadoop101 lib]# ls |grep guava

guava-19.0.jar

guava-27.0-jre.jar

jersey-guava-2.25.1.jar

[root@hadoop101 lib]# rm -f guava-19.0.jar

(6)启动元数据服务,后台运行服务

注意hive 2.x版本以上需要启动两个服务 metastore和hiveserver2 否则会报错Exception in thread "main" java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHivemetaStoreClient

[root@hadoop101 apache-hive-3.1.2-bin]# nohup hive --service metastore >metasotre.log>&1 &

[root@hadoop101 apache-hive-3.1.2-bin]# nohup hive --service hiveserver2 >hiveserver2.log >&1 &

  1. 启动hive

[root@hadoop101 apache-hive-3.1.2-bin]# hive

  1. 安装Kakfa_2.11-2.4.0
  1. 上传压缩包并解压,并进行解压

[root@hadoop101 software]# tar -zxvf kafka_2.11-2.4.0.tgz -C /opt/module/

  1. 进入kafka目录,穿件log日志文件夹

[root@hadoop101 software]# cd /opt/module/kafka_2.11-2.4.0/

[root@hadoop101 kafka_2.11-2.4.0]# mkdir logs

(3)修改配置文件

[root@hadoop101 kafka_2.11-2.4.0]# cd config/

[root@hadoop101 config]# vim server.properties

输入以下内容:

#broker的全局唯一编号,不能重复

broker.id=0

#删除topic功能使能

delete.topic.enable=true

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的现成数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka运行日志存放的路径

log.dirs=/opt/module/kafka_2.11-2.4.0/logs

#topic在当前broker上的分区个数

num.partitions=1

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除

#默认数据保留7天注释

#log.retention.hours=168

#配置连接Zookeeper集群地址

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka_2.4

注意:zookeeper.connect之所以在zk地址后再加上个kafa_2.4目的在于注册信息不是直接注册到zk根目录下,而是注册到 /kakfa_2.4目录下。对应的kafka 命令zk参数也得跟着变

(4)分发到其他节点并对应修改broker.id。102,103节点分别对应1,2

[root@hadoop101 kafka_2.11-2.4.0]# cd /opt/module/

[root@hadoop101 module]# scp -r /opt/module/kafka_2.11-2.4.0/ hadoop102:/opt/module/

[root@hadoop101 module]# scp -r /opt/module/kafka_2.11-2.4.0/ hadoop103:/opt/module/

[root@hadoop102 config]# pwd

/opt/module/kafka_2.11-2.4.0/config

[root@hadoop102 config]# vim server.properties

broker.id=1

[root@hadoop103 config]# pwd

/opt/module/kafka_2.11-2.4.0/config

[root@hadoop103 config]# vim server.properties

broker.id=2

(5)启动zk集群,再启动kafka

[root@hadoop101 module]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

[root@hadoop102 module]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

[root@hadoop103 module]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

[root@hadoop101 module]# /opt/module/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.0/config/server.properties

[root@hadoop102 config]# /opt/module/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.0/config/server.properties

[root@hadoop103 config]# /opt/module/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.0/config/server.properties

(6)启动后,可以去zk里看下注册信息

[root@hadoop101 module]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 0] ls /

[hadoop-ha, kafka_2.4, rmstore, yarn-leader-election, zookeeper]

注册到kafka_2.4中,而不是根目录,可以继续查看里面信息

[zk: localhost:2181(CONNECTED) 1] ls /kafka_2.4

[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]

(7)创建topic命令,因为注册信息不是在根目录,所以zk参数得跟着变

[root@hadoop101 module]# /opt/module/kafka_2.11-2.4.0/bin/kafka-topics.sh --zookeeper hadoop101:2181/kafka_2.4  --create --replication-factor 2 --partitions 3 --topic test

Created topic test.

  1. 其他配置与总结

7.1设置物理核和虚拟核占比

(1)当前虚拟机为处理其为2核,那么虚拟化为4核让他比值为1比2,修改

yarn.nodemanager.resource.cpu-vcores参数,修改为4

[root@hadoop101 module]# cd /opt/module/hadoop-3.1.3/etc/hadoop/

[root@hadoop101 hadoop]# vim yarn-site.xml

  

     

         yarn.nodemanager.resource.cpu-vcores

         4

          

7.2修改单个容器下最大cpu资源申请

任务提交时,比如spark-submit,executor-core参数不得超过4个

[root@hadoop101 hadoop]# vim yarn-site.xml

     

     

       yarn.scheduler.maximum-allocation-vcores

       4

     

7.3设置每个任务容器内存大小和节点内存大小

控制任务提交每个容器内存的上限,以及yarn所可以占用的内存上限,例如当前虚拟机内存为4g那么控制yarn的每个节点内存不能超过4g

[root@hadoop101 hadoop]# vim yarn-site.xml

 

     

      yarn.scheduler.maximum-allocation-mb

      4096

     

    

    

   yarn.nodemanager.resource.memory-mb

   7168

   

7.4配置容量调度器队列

容量调度器默认root队列,现在改为spark, hive两个队列,并设置spark队列资源占比为80%,hive为20%

[root@hadoop101 hadoop]# vim yarn-site.xml

 

   

      yarn.resourcemanager.scheduler.class

      org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler

  

  

    yarn.scheduler.capacity.root.queues

    default

  

  

  yarn.scheduler.capacity.root.default.capacity

  100

  

 

    yarn.scheduler.capacity.root.default.queues

    flink,hive

 

 

   yarn.scheduler.capacity.root.default.flink.capacity

   80

 

    yarn.scheduler.capacity.root.default.hive.capacity

   20

  

7.5配置垃圾回收站

回收站保留半小时数据

[root@hadoop101 hadoop]# vim core-site.xml

fs.trash.interval

30

7.6配置历史服务器

[root@hadoop101 hadoop]# vim yarn-site.xml

  

        

                yarn.nodemanager.pmem-check-enabled

                false

        

        

        

                yarn.nodemanager.vmem-check-enabled

                false

        

 

        

                yarn.log-aggregation-enable

                true

        

        

                yarn.nodemanager.remote-app-log-dir

                /opt/module/hadoop-3.1.3/yarn-logs

        

  

        

                yarn.log-aggregation.retain-seconds

                604800

        

                yarn.log.server.url

                http://hadoop102:19888/jobhistory/logs

        

修改mapred-site.xml

 

                mapreduce.framework.name

                yarn

                指定mr框架为yarn方式

        

      

        

                mapreduce.jobhistory.address

                hadoop102:10020

                历史服务器端口号

        

         

                mapreduce.jobhistory.webapp.address

                hadoop102:19888

                历史服务器的WEB UI端口号

        

7.7总结

(1)分发core-site.xml yarn.xml

[root@hadoop101 hadoop]# scp yarn-site.xml hadoop102:/opt/module/hadoop-3.1.3/etc/hadoop/

[root@hadoop101 hadoop]# scp yarn-site.xml hadoop103:/opt/module/hadoop-3.1.3/etc/hadoop/

[root@hadoop101 hadoop]# scp core-site.xml hadoop102:/opt/module/hadoop-3.1.3/etc/hadoop/

[root@hadoop101 hadoop]# scp core-site.xml hadoop103:/opt/module/hadoop-3.1.3/etc/hadoop/

[root@hadoop101 hadoop]# scp mapred-site.xml hadoop102:/opt/module/hadoop-3.1.3/etc/hadoop/

mapred-site.xml

[root@hadoop101 hadoop]# scp mapred-site.xml hadoop103:/opt/module/hadoop-3.1.3/etc/hadoop/

mapred-site.xml

(2)重启集群,观察 yarn,8088页面,最大内存,最大vcore,容器可调度最大内存都已发生变化

(3)所有启动命令

启动zokeeper

[root@hadoop101 hadoop]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

[root@hadoop102 hadoop]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

[root@hadoop103 hadoop]# /opt/module/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

启动kafka

[root@hadoop101 hadoop]# /opt/module/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.0/config/server.properties

[root@hadoop102 hadoop]# /opt/module/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.0/config/server.properties

[root@hadoop103 hadoop]# /opt/module/kafka_2.11-2.4.0/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.11-2.4.0/config/server.properties

启动hive服务

[root@hadoop101 apache-hive-3.1.2-bin]# nohup hive --service metastore >metasotre.log>&1 &

[root@hadoop101 apache-hive-3.1.2-bin]# nohup hive --service hiveserver2 >hiveserver2.log >&1 &

启动hue

[root@hadoop102 hue-master]# build/env/bin/supervisor

启动hdfs集群

[root@hadoop101 hadoop]# start-all.sh

启动haoop历史服务器

[root@hadoop102 hadoop]# mr-jobhistory-daemon.sh start historyserver

  1. 安装Flink

8.1 Yarn模式

  1. 上传压缩包到hadoop103进行解压

[root@hadoop103 ~]# mkdir -p /opt/software

[root@hadoop103 software]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz -C /opt/module/

[root@hadoop103 software]# cd /opt/module/flink-1.10.1/

(2)进入到lib目录下,上传flink-shaded-hadoop-2-uber-2.8.3-10.0

[root@hadoop103 flink-1.10.1]# cd lib/

[root@hadoop103 lib]# ls

flink-dist_2.11-1.10.0.jar                 flink-table_2.11-1.10.0.jar        log4j-1.2.17.jar

flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  flink-table-blink_2.11-1.10.0.jar  slf4j-log4j12-1.7.15.jar

(3)编辑flink-conf.yaml

jobmanager.rpc.address: hadoop103

jobmanager.rpc.port: 6123

jobmanager.heap.size: 1024m

jobmanager.execution.failover-strategy: region

rest.port: 8081

web.submit.enable: true

env.java.home: /opt/module/jdk1.8.0_211

env.java.opts: -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:-UseGCOverheadLimit -XX:+HeapDumponOutOfMemoryError -XX:HeapDumpPath=/tmp/wormhole/gc

yarn.application-attempts: 2

(4)启动yarn-session

[root@hadoop103 lib]# cd ..

[root@hadoop103 flink-1.10.0]# bin/yarn-session.sh --queue flink

  1. 查看对应地址

8.2高可用(HA)

JobManager协调每个Flink部署。它负责调度和资源管理。

默认情况下,每个Flink集群只有一个JobManager实例。这将创建一个单点故障:如果一个JobManager崩溃,则无法提交任何新程序,并且正在运行的程序也会失败。

使用JobManager高可用性,可以从JobManager故障中恢复,从而消除单点故障问题。

下面介绍 YARN模式下的高可用

在运行YARN模式高可用情况下,不会起多个JobManager,只会运行一个JobManager实例,当实例出现故障时,YARN会重新启动该实例

(1)修改yarn-site.xml,修改最大重试次数,默认值为2

[root@hadoop101 hadoop]# vim /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml

  yarn.resourcemanager.am.max-attempts

  4

  

    The maximum number of application master execution attempts.

  

(2)分发到其他机器上

[root@hadoop101 hadoop]# scp yarn-site.xml hadoop102:/opt/module/hadoop-3.1.3/etc/hadoop/

[root@hadoop101 hadoop]# scp yarn-site.xml hadoop103:/opt/module/hadoop-3.1.3/etc/hadoop/

(3)修改flink-conf.yaml,添加重试次数

[root@hadoop103 conf]# vim flink-conf.yaml

yarn.application-attempts: 4

注意:yarn.resourcemanager.am.maxattempts是应用程序重新启动的上限,因此Flink中设置的应用程序尝试次数不能超过启动YARN和YARN进群设置。

(4)配置zookeeper地址,修改flink-conf.yaml

[root@hadoop103 conf]# vim flink-conf.yaml

high-availability: zookeeper

 high-availability.storageDir: hdfs://mycluster/flink/ha/

high-availability.zookeeper.quorum: hadoop101:2181,hadoop102:2181,hadoop103:2181

high-availability.zookeeper.path.root: /flink

(5)启动集群

[root@hadoop101 ~]# /opt/module/zookeeper-3.4.10/bin/zkServer.sh start

[root@hadoop102 ~]# /opt/module/zookeeper-3.4.10/bin/zkServer.sh start

[root@hadoop103 ~]# /opt/module/zookeeper-3.4.10/bin/zkServer.sh start

[root@hadoop101 ~]# /opt/module/hadoop-2.7.2/sbin/start-dfs.sh

[root@hadoop103 ~]# /opt/module/hadoop-2.7.2/sbin/start-yarn.sh

(6)启动flink

[root@hadoop103 flink-1.10.0]# bin/yarn-session.sh --queue flink

(7)配置flink环境变量

[root@hadoop103 flink-1.10.0]# vim /etc/profile

#Flink_HOME

export Flink_HOME=/opt/module/flink-1.10.0

export PATH=$PATH:$Flink_HOME/bin

[root@hadoop103 flink-1.10.0]# source /etc/profile

启动成功

(8)如果zookeeper使用kerberos安全模式运行,则需配置以下参数(可选)

[root@hadoop103 flink-1.10.1]# vim conf/flink-conf.yaml

zookeeper.sasl.service-name: zookeeper

zookeeper.sasl.login-context-name: Client

  1. 安装Hbase

9.1 概述

 Hbase是一个基于Hadoop的k,v数据库,是一个分布式的,可伸缩的大数据存储数据库。

Hbase适用于实时读/写访问,模仿了Google的BigTable。

9.2 特性

(1)线性和模块化可扩展性

(2)严格一致的读写

(3)表的自动化和可配置切片

(4)RegionServer之间的自动故障转移支持

(5)通过Hbase表备份Hadoop MapReduce作业

(6)提供简单易用的Java Api

(7)块缓存和布隆过滤器用于实时查询

9.3 架构图

图6-1 Atlas架构原理

9.4 完全分布是安装

   在完全分布式配置中,集群包含多个节点,每个节点运行一个或多个Hbase守护进程。其中包括主实例和备份Master实例,多个Zookeeper节点和多个RegionServer节点。

Node Name

Master

Zookeeper

RegionServer

Hadoop101

yes

yes

yes

Hadoop102

backup

yes

yes

Hadoop103

no

yes

yes

  1. 上传并解压hbase-2.2.4-bin.tar.gz

[root@hadoop101 hadoop]# cd /opt/software/

[root@hadoop101 software]# tar -zxvf hbase-2.2.4-bin.tar.gz -C /opt/module/

(2)修改conf/regionservers,删除localhost,修改对应各主机域名或ip

[root@hadoop101 software]# cd /opt/module/hbase-2.2.4/

[root@hadoop101 hbase-2.2.4]# vim conf/regionservers

hadoop101

hadoop102

hadoop103

(3)在conf创建一个文件名为backup-masters,并且在这文件里添加hadoop102的域名

[root@hadoop101 hbase-2.2.4]# vim conf/backup-masters

hadoop102

(4)修改conf/hbase-site.xml文件

[root@hadoop101 hbase-2.2.4]# cd conf/

[root@hadoop101 conf]# vim hbase-site.xml

  hbase.rootdir

  hdfs://mycluster/hbase

  hbase.cluster.distributed

  true

  hbase.master.port

  16000

  hbase.zookeeper.property.dataDir

  /home/root/zookeeper

  hbase.zookeeper.quorum

  hadoop101,hadoop102,hadoop103


hbase.unsafe.stream.capability.enforce
false

(5)修改hbase-env.sh。声明jdk路径,并且讲hbase自带的zookeeper设置为false

[root@hadoop101 conf]# vim hbase-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_211

export Hbase_MANAGES_ZK=false

(6)拷贝hdfs-site.xml到hbase conf下

[root@hadoop101 conf]# cp /opt/module/hadoop-3.1.3/etc/hadoop/hdfs-site.xml /opt/module/hbase-2.2.4/conf/

(7)分发hbase到其他节点

[root@hadoop101 module]# scp -r hbase-2.2.4/ hadoop102:/opt/module/

[root@hadoop101 module]# scp -r hbase-2.2.4/ hadoop103:/opt/module/

(8)配置hbase环境变量

[root@hadoop101 module]# vim /etc/profile

#Hbase_HOME

export Hbase_HOME=/opt/module/hbase-2.2.4

export PATH=$PATH:$Hbase_HOME/bin

[root@hadoop101 module]# source /etc/profile

[root@hadoop102 module]# vim /etc/profile

#Hbase_HOME

export Hbase_HOME=/opt/module/hbase-2.2.4

export PATH=$PATH:$Hbase_HOME/bin

[root@hadoop102 module]# source /etc/profile

[root@hadoop103 module]# vim /etc/profile

#Hbase_HOME

export Hbase_HOME=/opt/module/hbase-2.2.4

export PATH=$PATH:$Hbase_HOME/bin

[root@hadoop103 module]# source /etc/profile

(7)启动hbase

[root@hadoop101 module]# start-hbase.sh

  1. Web Ui访问,http://hadoop101:16010

  1. 实时数仓准备工作

10.1表模型
  1. 宽表

  1. 基础表

10.2创建对应topic

[root@hadoop101 module]# cd /opt/module/kafka_2.11-2.4.0/

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic basewebsite

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic basead

Created topic basead.

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic member

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic memberpaymoney

Created topic memberpaymoney.

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic memberregtype

Created topic memberregtype.

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic membervip

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic dwdmember

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic dwdmemberpaymoney

[root@hadoop101 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 3  --topic dwdmemberregtype

10.2创建对应Hbase表

[root@hadoop101 kafka_2.11-2.4.0]# hbase shell

hbase(main):001:0>  create_namespace 'education'

hbase(main):002:0> create 'education:dwd_basewebsite',{NAME => 'info', VERSIONS => '3', TTL => 'FOREVER'}

hbase(main):003:0> create 'education:dwd_basead',{NAME => 'info', VERSIONS => '3', TTL => 'FOREVER'}

hbase(main):004:0> create 'education:dwd_membervip',{NAME => 'info', VERSIONS => '3', TTL => 'FOREVER'}

hbase(main):005:0> create 'education:dim_member',{NAME=>'info',VERSIONS => '3', TTL => 'FOREVER'},{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

  1. 编写代码

11.1构建项目

(1)采用官方通过maven构建的项目的方式,首先保证本机配置了maven环境变量

(2)构建项目

mvn archetype:generate     -DarchetypeGroupId=org.apache.flink    -DarchetypeArtifactId=flink-quickstart-scala    -DarchetypeVersion=1.10.0

(3)输入相应pom配置信息后,通过ide打开项目

11.2配置pom.xml

(1)需要替换原有build工具,让项目支持既能打包java代码,又能打包scala代码


         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4.0.0

    com.atguigu.education-flink
    education-flink-online
    1.0-SNAPSHOT
    jar

    
        UTF-8
        1.10.0
        2.11
        2.11.12
    


    
        
        
        
            org.apache.flink
            flink-scala_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
        


        
            org.apache.flink
            flink-statebackend-rocksdb_${scala.binary.version}
            ${flink.version}
        

        
        
            org.scala-lang
            scala-library

            ${scala.version}
        

        
        
            com.alibaba
            fastjson
            1.2.68
        


        
            org.apache.flink
            flink-connector-kafka-0.10_${scala.binary.version}
            ${flink.version}
        


        
            org.apache.flink
            flink-scala_2.11

            1.10.0
        

        
            org.apache.flink
            flink-streaming-scala_2.11
            1.10.0
        

        
            org.apache.flink
            flink-clients_2.11
            1.10.0
        

        
        
            org.apache.hbase
            hbase-server
            2.2.4
        

        
        
            org.apache.hbase
            hbase-client
            2.2.4
        


        
        
            com.google.guava
            guava
            29.0-jre
        

        
            org.apache.flink
            flink-runtime-web_2.11

            ${flink.version}
        

        

        
            com.alibaba
            druid
            provided
            1.1.16
        


        
        
            org.apache.kudu
            kudu-client
            1.10.0
        







        

        
        












    


    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.6.1
                
                
                    1.8
                    1.8
                

            


            
                org.scala-tools
                maven-scala-plugin
                2.15.1
                
                    
                        compile-scala
                        
                            add-source
                            compile
                        

                    

                    
                        test-compile-scala
                        
                            add-source
                            testCompile
                        

                    

                

            

            
                org.apache.maven.plugins
                maven-assembly-plugin
                
                    
                        
                        

                    
                    
                        jar-with-dependencies
                    

                

            

            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                        
                            compile
                            testCompile
                        

                    

                

            


            
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.0.0
                
                    
                        make-assembly
                        package
                        
                            single
                        

                    

                

            

        

    

11.3编写生产者代码
  1. 模拟数据收集,编写kafka生产者代码,对应6张表6个topic,所以编写 6个生产者代码

11.3.1广告基础信息表

package com.atguigu.education.kafkaproducer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class baseAdLogKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        props.put("acks", "-1");
        props.put("batch.size", "16384");
        props.put("linger.ms", "10");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
        for (int i = 0; i < 10; i++) {
            GdmbaseAd gdmbaseAd = GdmbaseAdLog.generateLog(String.valueOf(i));
            String jsonString = JSON.toJSonString(gdmbaseAd);
            producer.send(new ProducerRecord("basead", jsonString));
        }
        producer.flush();
        producer.close();
    }

    public static class GdmbaseAd {
        private String adid;
        private String adname;
        private String dn;

        public String getAdid() {
            return adid;
        }

        public void setAdid(String adid) {
            this.adid = adid;
        }

        public String getAdname() {
            return adname;
        }

        public void setAdname(String adname) {
            this.adname = adname;
        }

        public String getDn() {
            return dn;
        }

        public void setDn(String dn) {
            this.dn = dn;
        }
    }

    public static class GdmbaseAdLog {
        public static GdmbaseAd generateLog(String adid) {
            GdmbaseAd basead = new GdmbaseAd();
            basead.setAdid(adid);
            basead.setAdname("注册弹窗广告" + adid);
            basead.setDn("webA");
            return basead;
        }
    }
}

11.3.2用户基础表

package com.atguigu.education.kafkaproducer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.Random;

public class baseMemberKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        props.put("acks", "-1");
        props.put("batch.size", "16384");
        props.put("linger.ms", "10");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
        for (int i = 0; i < 1000000; i++) {
            GdmMember gdmMember = MemberLog.generateLog(String.valueOf(i));
            String jsonString = JSON.toJSonString(gdmMember);
            producer.send(new ProducerRecord("member", jsonString));
        }
        producer.flush();
        producer.close();
    }

    public static class GdmMember {

        private String uid;  //用户id
        private String password;  //密码
        private String email;
        private String username; //用户名
        private String fullname; //用户名
        private String birthday;
        private String phone;
        private String qq;
        private String ad_id;
        private String unitname;
        private String mailaddr;
        private String zipcode;
        private String kjlevel;
        private String register;
        private String memberlevel;
        private String paymoney;
        private String userip;
        private String regupdatetime;
        private String lastlogin;
        private String iconurl;
        private String dt;
        private String dn;


        public String getFullname() {
            return fullname;
        }

        public void setFullname(String fullname) {
            this.fullname = fullname;
        }

        public String getUid() {
            return uid;
        }

        public void setUid(String uid) {
            this.uid = uid;
        }

        public String getPassword() {
            return password;
        }

        public void setPassword(String password) {
            this.password = password;
        }

        public String getEmail() {
            return email;
        }

        public void setEmail(String email) {
            this.email = email;
        }

        public String getUsername() {
            return username;
        }

        public void setUsername(String username) {
            this.username = username;
        }

        public String getBirthday() {
            return birthday;
        }

        public void setBirthday(String birthday) {
            this.birthday = birthday;
        }

        public String getPhone() {
            return phone;
        }

        public void setPhone(String phone) {
            this.phone = phone;
        }

        public String getQq() {
            return qq;
        }

        public void setQq(String qq) {
            this.qq = qq;
        }

        public String getAd_id() {
            return ad_id;
        }

        public void setAd_id(String ad_id) {
            this.ad_id = ad_id;
        }

        public String getUnitname() {
            return unitname;
        }

        public void setUnitname(String unitname) {
            this.unitname = unitname;
        }

        public String getMailaddr() {
            return mailaddr;
        }

        public void setMailaddr(String mailaddr) {
            this.mailaddr = mailaddr;
        }

        public String getZipcode() {
            return zipcode;
        }

        public void setZipcode(String zipcode) {
            this.zipcode = zipcode;
        }

        public String getKjlevel() {
            return kjlevel;
        }

        public void setKjlevel(String kjlevel) {
            this.kjlevel = kjlevel;
        }

        public String getRegister() {
            return register;
        }

        public void setRegister(String register) {
            this.register = register;
        }

        public String getMemberlevel() {
            return memberlevel;
        }

        public void setMemberlevel(String memberlevel) {
            this.memberlevel = memberlevel;
        }

        public String getPaymoney() {
            return paymoney;
        }

        public void setPaymoney(String paymoney) {
            this.paymoney = paymoney;
        }

        public String getUserip() {
            return userip;
        }

        public void setUserip(String userip) {
            this.userip = userip;
        }

        public String getRegupdatetime() {
            return regupdatetime;
        }

        public void setRegupdatetime(String regupdatetime) {
            this.regupdatetime = regupdatetime;
        }

        public String getLastlogin() {
            return lastlogin;
        }

        public void setLastlogin(String lastlogin) {
            this.lastlogin = lastlogin;
        }

        public String getIconurl() {
            return iconurl;
        }

        public void setIconurl(String iconurl) {
            this.iconurl = iconurl;
        }

        public String getDn() {
            return dn;
        }

        public void setDn(String dn) {
            this.dn = dn;
        }

        public String getDt() {
            return dt;
        }

        public void setDt(String dt) {
            this.dt = dt;
        }
    }

    public static class MemberLog {


        private static String[] dns = new String[]{"webA", "webB", "webC"};
        private static String[] type = new String[]{"insert", "update"};
        private static int[][] range = {{607649792, 608174079},//36.56.0.0-36.63.255.255
                {1038614528, 1039007743},//61.232.0.0-61.237.255.255
                {1783627776, 1784676351},//106.80.0.0-106.95.255.255
                {2035023872, 2035154943},//121.76.0.0-121.77.255.255
                {2078801920, 2079064063},//123.232.0.0-123.235.255.255
                {-1950089216, -1948778497},//139.196.0.0-139.215.255.255
                {-1425539072, -1425014785},//171.8.0.0-171.15.255.255
                {-1236271104, -1235419137},//182.80.0.0-182.92.255.255
                {-770113536, -768606209},//210.25.0.0-210.47.255.255
                {-569376768, -564133889}, //222.16.0.0-222.95.255.255
        };

        public static GdmMember generateLog(String uid) {
            GdmMember member = new GdmMember();
            Random rand = new Random();
            member.setAd_id(rand.nextInt(10) + "");
            String birthday = new SimpleDateFormat("yyyy-MM-dd")
                    .format(RondomDate.randomDate("1960-01-01 00:00:00", "2000-01-01 00:00:00"));
            member.setDt(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDateTime.now().minusDays(8)));
            member.setDn(dns[0]);
            member.setUid(uid);
            member.setPassword("123456");
            member.setEmail("test@126.com");
            member.setFullname("王" + uid);
            member.setPhone("13711235451");
            member.setBirthday(birthday);
            member.setQq("10000");
            member.setUnitname("-");
            member.setMailaddr("-");
            member.setZipcode("-");
//            String registerdata = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//                    .format(RondomDate.randomDate("2019-06-30 10:00:00", "2019-06-30 11:00:00"));
            member.setRegister(String.valueOf(System.currentTimeMillis()));
            member.setMemberlevel((rand.nextInt(8) + 1) + "");
            member.setPaymoney("-");
            member.setUserip("-");
            int index = rand.nextInt(10);
            String ip = num2ip(range[index][0]
                    + new Random().nextInt(range[index][1] - range[index][0]));
            member.setUserip(ip);
            member.setRegupdatetime("-");
            member.setLastlogin("-");
            member.setIconurl("-");
            return member;
        }

        public static String num2ip(int ip) {
            int[] b = new int[4];
            String x = "";

            b[0] = (int) ((ip >> 24) & 0xff);
            b[1] = (int) ((ip >> 16) & 0xff);
            b[2] = (int) ((ip >> 8) & 0xff);
            b[3] = (int) (ip & 0xff);
            x = Integer.toString(b[0]) + "." + Integer.toString(b[1]) + "."
                    + Integer.toString(b[2]) + "." + Integer.toString(b[3]);

            return x;
        }
    }
}

11.3.3用户支付金额表

package com.atguigu.education.kafkaproducer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.linkedBlockingDeque;

public class baseMemberPayMoney {
    //创建池
    public static BlockingQueue> queue = new linkedBlockingDeque<>(10);


    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        props.put("acks", "-1");
        props.put("batch.size", "16384");
        props.put("linger.ms", "10");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
        for (int i = 0; i < 1000000; i++) {
            GdmPcenterMemPaymoney memPaymoney = GdmPcenterMemPayMoneyLog.generateLog(String.valueOf(i));
            String jsonString = JSON.toJSonString(memPaymoney);
            producer.send(new ProducerRecord("memberpaymoney", jsonString));
        }
        producer.flush();
        producer.close();
    }

    public static class GdmPcenterMemPaymoney {

        private String uid;
        private String paymoney;
        private String vip_id;
        private String updatetime;
        private String siteid;
        private String dt;
        private String dn;
        private String createtime;

        public String getCreatetime() {
            return createtime;
        }

        public void setCreatetime(String createtime) {
            this.createtime = createtime;
        }

        public String getDt() {
            return dt;
        }

        public void setDt(String dt) {
            this.dt = dt;
        }

        public String getUid() {
            return uid;
        }

        public void setUid(String uid) {
            this.uid = uid;
        }

        public String getPaymoney() {
            return paymoney;
        }

        public void setPaymoney(String paymoney) {
            this.paymoney = paymoney;
        }

        public String getVip_id() {
            return vip_id;
        }

        public void setVip_id(String vip_id) {
            this.vip_id = vip_id;
        }

        public String getUpdatetime() {
            return updatetime;
        }

        public void setUpdatetime(String updatetime) {
            this.updatetime = updatetime;
        }

        public String getSiteid() {
            return siteid;
        }

        public void setSiteid(String siteid) {
            this.siteid = siteid;
        }

        public String getDn() {
            return dn;
        }

        public void setDn(String dn) {
            this.dn = dn;
        }
    }

    public static class GdmPcenterMemPayMoneyLog {

        public static GdmPcenterMemPaymoney generateLog(String uid) {
            GdmPcenterMemPaymoney memPaymoney = new GdmPcenterMemPaymoney();
            Random random = new Random();
            DecimalFormat df = new DecimalFormat("0.00");
            double money = random.nextDouble() * 1000;
            memPaymoney.setPaymoney(df.format(money));
            memPaymoney.setDt(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDateTime.now().minusDays(8)));
            memPaymoney.setDn("webA");
            memPaymoney.setSiteid(String.valueOf(random.nextInt(5)));
            memPaymoney.setVip_id(String.valueOf(random.nextInt(5)));
            memPaymoney.setUid(uid);
//            String registerdata = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//                    .format(RondomDate.randomDate("2019-06-30 10:00:00", "2019-06-30 11:00:00"));
            memPaymoney.setCreatetime(String.valueOf(System.currentTimeMillis()));
            return memPaymoney;
        }

    }
}

11.3.4用户注册跳转地址表

package com.atguigu.education.kafkaproducer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.Random;

public class baseMemberRegtypeProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        props.put("acks", "-1");
        props.put("batch.size", "16384");
        props.put("linger.ms", "10");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
        for (int i = 0; i < 1000000; i++) {
            GdmMemberRegType memberRegType = GdmMemberRegTypeLog.generateLog(String.valueOf(i), "webA");
            String jsonString = JSON.toJSonString(memberRegType);
            producer.send(new ProducerRecord("memberregtype", jsonString));
        }
        producer.flush();
        producer.close();
    }

    public static class GdmMemberRegTypeLog {

        private static String[] webAappregUrl = new String[]{
                "http:www.webA.com/product/register/index.html", "http:www.webA.com/sale/register/index.html",
                "http:www.webA.com/product10/register/aa/index.html",
                "http:www.webA.com/hhh/wwww/index.html"};
        private static String[] webBappregUrl = new String[]{
                "http:www.webB.com/product/register/index.html", "http:www.webB.com/sale/register/index.html",
                "http:www.webB.com/hhh/wwww/index.html"};
        private static String[] webcappregUrl = new String[]{
                "http:www.webC.com/product/register/index.html", "http:www.webB.com/sale/register/index.html",
                "http:www.webC.com/product52/register/ac/index.html"};


        public static GdmMemberRegType generateLog(String uid, String dn) {
            GdmMemberRegType memberRegType = new GdmMemberRegType();
            memberRegType.setAppkey("-");
            Random random = new Random();
            String url = "";
            int index = random.nextInt(4);
            switch (dn) {
                case "webA":
                    url = webAappregUrl[index];
                    break;
                case "webB":
                    url = webBappregUrl[index];
                    break;
                case "webC":
                    url = webcappregUrl[index];
                    break;
            }
            memberRegType.setAppregurl(url);
            memberRegType.setBdp_uuid("-");
//            String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//                    .format(RondomDate.randomDate("2019-06-30 10:00:00", "2019-06-30 11:00:00"));
            memberRegType.setCreatetime(String.valueOf(System.currentTimeMillis()));
            memberRegType.setDt(DateTimeFormatter.ofPattern("yyyyMMdd").format(LocalDateTime.now().minusDays(8)));
            memberRegType.setDn(dn);
            memberRegType.setDomain("-");
            memberRegType.setIsranreg("-");
            memberRegType.setDomain("-");
            memberRegType.setWebsiteid(String.valueOf(random.nextInt(5)));
            memberRegType.setRegsource(String.valueOf(random.nextInt(5)));
            memberRegType.setUid(uid);
            return memberRegType;
        }
    }

    public static class GdmMemberRegType {

        private String reflagid;
        private String uid;
        private String regsource; //注册来源 1.pc 2.mobile 3.app 4.wechat
        private String appkey;
        private String appregurl;
        private String websiteid;
        private String domain;
        private String isranreg;
        private String bdp_uuid;
        private String createtime;
        private String dt;
        private String dn;


        public String getDt() {
            return dt;
        }

        public void setDt(String dt) {
            this.dt = dt;
        }

        public String getReflagid() {
            return reflagid;
        }

        public void setReflagid(String reflagid) {
            this.reflagid = reflagid;
        }

        public String getUid() {
            return uid;
        }

        public void setUid(String uid) {
            this.uid = uid;
        }

        public String getRegsource() {
            return regsource;
        }

        public void setRegsource(String regsource) {
            this.regsource = regsource;
        }

        public String getAppkey() {
            return appkey;
        }

        public void setAppkey(String appkey) {
            this.appkey = appkey;
        }

        public String getAppregurl() {
            return appregurl;
        }

        public void setAppregurl(String appregurl) {
            this.appregurl = appregurl;
        }

        public String getWebsiteid() {
            return websiteid;
        }

        public void setWebsiteid(String websiteid) {
            this.websiteid = websiteid;
        }

        public String getDomain() {
            return domain;
        }

        public void setDomain(String domain) {
            this.domain = domain;
        }

        public String getIsranreg() {
            return isranreg;
        }

        public void setIsranreg(String isranreg) {
            this.isranreg = isranreg;
        }

        public String getBdp_uuid() {
            return bdp_uuid;
        }

        public void setBdp_uuid(String bdp_uuid) {
            this.bdp_uuid = bdp_uuid;
        }

        public String getCreatetime() {
            return createtime;
        }

        public void setCreatetime(String createtime) {
            this.createtime = createtime;
        }

        public String getDn() {
            return dn;
        }

        public void setDn(String dn) {
            this.dn = dn;
        }
    }

}

11.3.5 Vip级别基础表

package com.atguigu.education.kafkaproducer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.SimpleDateFormat;
import java.util.Properties;

public class baseMemberVipProducer {

        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
            props.put("acks", "-1");
            props.put("batch.size", "16384");
            props.put("linger.ms", "10");
            props.put("buffer.memory", "33554432");
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer producer = new KafkaProducer(props);
            for (int i = 0; i < 5; i++) {
                GdmPcenterMemViplevel memViplevel= GdmPcenterMemViplevelLog.generateLog(String.valueOf(i));
                String jsonString = JSON.toJSonString(memViplevel);
                producer.send(new ProducerRecord("membervip", jsonString));
            }
            producer.flush();
            producer.close();
    }
    public static  class GdmPcenterMemViplevelLog {

        private static String[] vipLevels = new String[]{"普通会员", "白金", "银卡", "金卡", "钻石"};

        public static GdmPcenterMemViplevel generateLog(String vipid) {
            GdmPcenterMemViplevel memViplevel = new GdmPcenterMemViplevel();
            memViplevel.setDiscountval("-");
            memViplevel.setDn("webA");
            String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                    .format(RondomDate.randomDate("2015-01-01 00:00:00", "2016-06-30 00:00:00"));
            String time2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                    .format(RondomDate.randomDate("2016-01-01 00:00:00", "2019-06-30 00:00:00"));
            memViplevel.setLast_modify_time("");
            memViplevel.setStart_time(time);
            memViplevel.setEnd_time(time2);
            memViplevel.setLast_modify_time(time2);
            memViplevel.setMax_free("-");
            memViplevel.setMin_free("-");
            memViplevel.setNext_level("-");
            memViplevel.setVip_id(vipid);
            memViplevel.setOperator("update");
            memViplevel.setVip_level(vipLevels[Integer.parseInt(vipid)]);
            return memViplevel;
        }

    }
    public static class GdmPcenterMemViplevel {

        private String vip_id;
        private String vip_name;
        private String vip_level;
        private String min_free;
        private String max_free;
        private String start_time;
        private String end_time;
        private String next_level;
        private String discountval;
        private String last_modify_time;
        private String operator;
        private String siteid;
        private String dn;

        public String getVip_id() {
            return vip_id;
        }

        public void setVip_id(String vip_id) {
            this.vip_id = vip_id;
        }

        public String getVip_name() {
            return vip_name;
        }

        public void setVip_name(String vip_name) {
            this.vip_name = vip_name;
        }

        public String getVip_level() {
            return vip_level;
        }

        public void setVip_level(String vip_level) {
            this.vip_level = vip_level;
        }

        public String getMin_free() {
            return min_free;
        }

        public void setMin_free(String min_free) {
            this.min_free = min_free;
        }

        public String getMax_free() {
            return max_free;
        }

        public void setMax_free(String max_free) {
            this.max_free = max_free;
        }

        public String getStart_time() {
            return start_time;
        }

        public void setStart_time(String start_time) {
            this.start_time = start_time;
        }

        public String getEnd_time() {
            return end_time;
        }

        public void setEnd_time(String end_time) {
            this.end_time = end_time;
        }

        public String getNext_level() {
            return next_level;
        }

        public void setNext_level(String next_level) {
            this.next_level = next_level;
        }

        public String getDiscountval() {
            return discountval;
        }

        public void setDiscountval(String discountval) {
            this.discountval = discountval;
        }

        public String getLast_modify_time() {
            return last_modify_time;
        }

        public void setLast_modify_time(String last_modify_time) {
            this.last_modify_time = last_modify_time;
        }

        public String getOperator() {
            return operator;
        }

        public void setOperator(String operator) {
            this.operator = operator;
        }

        public String getSiteid() {
            return siteid;
        }

        public void setSiteid(String siteid) {
            this.siteid = siteid;
        }

        public String getDn() {
            return dn;
        }

        public void setDn(String dn) {
            this.dn = dn;
        }
    }

}

11.3.6网站信息基础表

package com.atguigu.education.kafkaproducer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class baseWebSiteKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");
        props.put("acks", "-1");
        props.put("batch.size", "16384");
        props.put("linger.ms", "10");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
        for (int i = 0; i < 5; i++) {
            GdmbaseWebsite gdmbaseWebsite = GdmbaseWebSieLog.generateLog(String.valueOf(i), "webA");
            String jsonString = JSON.toJSonString(gdmbaseWebsite);
            producer.send(new ProducerRecord("basewebsite", jsonString));
        }
        for (int i = 0; i < 5; i++) {
            GdmbaseWebsite gdmbaseWebsite = GdmbaseWebSieLog.generateLog(String.valueOf(i), "webB");
            String jsonString = JSON.toJSonString(gdmbaseWebsite);
            producer.send(new ProducerRecord("basewebsite", jsonString));
        }
        for (int i = 0; i < 5; i++) {
            GdmbaseWebsite gdmbaseWebsite = GdmbaseWebSieLog.generateLog(String.valueOf(i), "webC");
            String jsonString = JSON.toJSonString(gdmbaseWebsite);
            producer.send(new ProducerRecord("basewebsite", jsonString));
        }
        producer.flush();
        producer.close();
    }
    public static class GdmbaseWebsite {

        private String siteid;
        private String sitename;
        private String siteurl;
        private String creator;
        private String createtime;
        private String delete;
        private String dn;

        public String getSiteid() {
            return siteid;
        }

        public void setSiteid(String siteid) {
            this.siteid = siteid;
        }

        public String getSitename() {
            return sitename;
        }

        public void setSitename(String sitename) {
            this.sitename = sitename;
        }

        public String getSiteurl() {
            return siteurl;
        }

        public void setSiteurl(String siteurl) {
            this.siteurl = siteurl;
        }

        public String getCreator() {
            return creator;
        }

        public void setCreator(String creator) {
            this.creator = creator;
        }

        public String getCreatetime() {
            return createtime;
        }

        public void setCreatetime(String createtime) {
            this.createtime = createtime;
        }

        public String getDelete() {
            return delete;
        }

        public void setDelete(String delete) {
            this.delete = delete;
        }

        public String getDn() {
            return dn;
        }

        public void setDn(String dn) {
            this.dn = dn;
        }
    }
    public static class GdmbaseWebSieLog {

        private static String[] siteNames = new String[]{"百度", "163", "114", "126", "谷歌"};
        private static String[] siteUrls = new String[]{"wwww.baidu.com", "www.163.com", "www.114.com",
                "www.126.com", "www.google.com"};

        public static GdmbaseWebsite generateLog(String siteid, String dn) {
            GdmbaseWebsite baseWebsite = new GdmbaseWebsite();
            Random rand = new Random();
            baseWebsite.setCreatetime("2000-01-01");
            baseWebsite.setDelete("0");
            baseWebsite.setSiteid(siteid);
            baseWebsite.setCreator("admin");
            baseWebsite.setDn(dn);
            int index = Integer.parseInt(siteid);
            baseWebsite.setSitename(siteNames[index]);
            baseWebsite.setSiteurl(siteUrls[index] + "/" + dn);
            return baseWebsite;
        }
    }
}

编写完代码后,点击运行向对应的topic发送数据,注意:用户基础表、用户支付金额表、用户跳转地址表 三表最好同时发送保证延迟误差不是太高

11.4编写case class
  1. 编写case class用于flink序列化与反序列化

package com.atguigu.education.model

case class DwdMemberPayMoney(uid: Int, var paymoney: String, siteid: Int, vip_id: Int, createtime: String, dt: String, dn: String)

package com.atguigu.education.model

case class baseViplevel(vip_id: Int, vip_level: String, start_time: String,
                        end_time: String, last_modify_time: String, max_free: String,
                        min_free: String, next_level: String, operator: String, dn: String)

package com.atguigu.education.model

case class baseWebSite(siteid: Int, sitename: String, siteurl: String, delete: String, createtime: String, creator: String, dn: String)

package com.atguigu.education.model

case class DwdMember(uid: Int, ad_id: Int, birthday: String, email: String, fullname: String, iconurl: String,
                     lastlogin: String, mailaddr: String, memberlevel: String, password: String, paymoney: String, phone: String,
                     qq: String, register: String, regupdatetime: String, unitname: String, userip: String, zipcode: String, dt: String,
                     dn: String)

package com.atguigu.education.model

case class DwdMemberPayMoney(uid: Int, var paymoney: String, siteid: Int, vip_id: Int, createtime: String, dt: String, dn: String)

package com.atguigu.education.model

case class DwdMemberRegtype(uid: Int, appkey: String, appregurl: String, bdp_uuid: String, createtime: String,
                            isranreg: String, regsource: String, websiteid: String, dt: String, dn: String)

package com.atguigu.education.model

case class TopicAndValue(topic:String,value:String)

11.5编写flink source与sink的反序列化类和序列化类

package com.atguigu.education.model

import java.nio.charset.Charset

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema


class DwdKafkaProducerSerializationSchema extends KeyedSerializationSchema[TopicAndValue] {
  val serialVersionUID = 1351665280744549933L;

  override def serializeKey(element: TopicAndValue): Array[Byte] = null

  override def serializevalue(element: TopicAndValue): Array[Byte] = {
    element.value.getBytes(Charset.forName("utf-8"))
  }

  override def getTargetTopic(element: TopicAndValue): String = {
    "dwd" + element.topic
  }
}

package com.atguigu.education.model

import com.google.gson.Gson
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

class DwdMemberDeserializationSchema extends KafkaDeserializationSchema[DwdMember] {
  override def isEndOfStream(nextElement: DwdMember): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): DwdMember = {
    val gson = new Gson()
    gson.fromJson(new String(record.value(), "utf-8"), classOf[DwdMember])
  }

  override def getProducedType: TypeInformation[DwdMember] = {
    TypeInformation.of(new TypeHint[DwdMember] {})
  }
}

package com.atguigu.education.model

import com.google.gson.Gson
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

class DwdMemberPayMoneyDeserializationSchema extends KafkaDeserializationSchema[DwdMemberPayMoney] {
  override def isEndOfStream(nextElement: DwdMemberPayMoney): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): DwdMemberPayMoney = {
    val gson = new Gson()
    gson.fromJson(new String(record.value(), "utf-8"), classOf[DwdMemberPayMoney])
  }

  override def getProducedType: TypeInformation[DwdMemberPayMoney] = {
    TypeInformation.of(new TypeHint[DwdMemberPayMoney] {})
  }
}

package com.atguigu.education.model

import com.google.gson.Gson
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

class DwdMemberRegtypeDeserializationSchema extends KafkaDeserializationSchema[DwdMemberRegtype] {
  override def isEndOfStream(nextElement: DwdMemberRegtype): Boolean = false

  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): DwdMemberRegtype = {
    val gson = new Gson()
    gson.fromJson(new String(record.value(), "utf-8"), classOf[DwdMemberRegtype])
  }

  override def getProducedType: TypeInformation[DwdMemberRegtype] = {
    TypeInformation.of(new TypeHint[DwdMemberRegtype] {})
  }
}

package com.atguigu.education.model

import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

class TopicAndValueDeserializationSchema extends KafkaDeserializationSchema[TopicAndValue] {
  //表是流最后一条元素
  override def isEndOfStream(t: TopicAndValue): Boolean = {
    false
  }

  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): TopicAndValue = {
    new TopicAndValue(consumerRecord.topic(), new String(consumerRecord.value(), "utf-8"))
  }

  //告诉flink 数据类型
  override def getProducedType: TypeInformation[TopicAndValue] = {
    TypeInformation.of(new TypeHint[TopicAndValue] {})
  }
}

11.6编写实时数仓ods层处理

11.6.1工具类

package com.atguigu.education.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;


public class ParseJsonData {

    public static JSonObject getJsonData(String data) {
        try {

            return JSONObject.parseObject(data);
        } catch (Exception e) {
            return null;
        }
    }

    public static String getJsonString(Object o) {
        return JSON.toJSonString(o);
    }
}

11.6.2编写ods逻辑

(1)flink监控多个topic,先对topic中数据进行过滤,使用侧输出流将维度表数据和事实表数据区分开,维度表写入Hbase表中,事实表写入第二层kafka topic

package com.atguigu.education.etl

import java.util.Properties

import com.alibaba.fastjson.JSONObject
import com.atguigu.education.model.{DwdKafkaProducerSerializationSchema, GlobalConfig, TopicAndValue, TopicAndValueDeserializationSchema}
import com.atguigu.education.util.ParseJsonData
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.util.Collector

//flink run -m yarn-cluster -ynm odsetldata -p 12 -ys 4  -yjm 1024 -ytm 2048m -d -c com.atguigu.education.etl.OdsEtlData -yqu flink ./education-flink-online-1.0-SNAPSHOT-jar-with-dependencies.jar --group.id test --bootstrap.servers hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic basewebsite,basead,member,memberpaymoney,memberregtype,membervip

//--group.id test --bootstrap.servers hadoop101:9092,hadoop102:9092,hadoop103:9092 --topic basewebsite,basead,member,memberpaymoney,memberregtype,membervip
object OdsEtlData {
  val BOOTSTRAP_SERVERS = "bootstrap.servers"
  val GROUP_ID = "group.id"
  val RETRIES = "retries"
  val TOPIC = "topic"

  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.getConfig.setGlobalJobParameters(params)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置时间模式为事件时间

    //checkpoint设置
    env.enableCheckpointing(60000l) //1分钟做一次checkpoint
    val checkpointConfig = env.getCheckpointConfig
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //仅仅一次
    checkpointConfig.setMinPauseBetweenCheckpoints(30000l) //设置checkpoint间隔时间30秒
    checkpointConfig.setCheckpointTimeout(10000l) //设置checkpoint超时时间
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //cancel时保留checkpoint
    //设置statebackend 为rockdb
    //    val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://mycluster/flink/checkpoint")
    //    env.setStateBackend(stateBackend)

    //设置重启策略   重启3次 间隔10秒
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)))
    import scala.collection.JavaConverters._
    val topicList = params.get(TOPIC).split(",").toBuffer.asJava
    val consumerProps = new Properties()
    consumerProps.setProperty(BOOTSTRAP_SERVERS, params.get(BOOTSTRAP_SERVERS))
    consumerProps.setProperty(GROUP_ID, params.get(GROUP_ID))
    val kafaEventSource = new FlinkKafkaConsumer010[TopicAndValue](topicList, new TopicAndValueDeserializationSchema, consumerProps)
    kafaEventSource.setStartFromEarliest()

    val dataStream = env.addSource(kafaEventSource).filter(item => {
      //先过滤非json数据
      val obj = ParseJsonData.getJsonData(item.value)
      obj.isInstanceOf[JSONObject]
    })
    //将dataStream拆成两份 一份维度表写到hbase 另一份事实表数据写到第二层kafka
    val sideOutHbaseTag = new OutputTag[TopicAndValue]("hbaseSinkStream")
    //    val sideOutGreenPlumTag = new OutputTag[TopicAndValue]("greenplumSinkStream")
    //    val sideOutKuduTag = new OutputTag[TopicAndValue]("kuduSinkStream")
    val result = dataStream.process(new ProcessFunction[TopicAndValue, TopicAndValue] {
      override def processElement(value: TopicAndValue, ctx: ProcessFunction[TopicAndValue, TopicAndValue]#Context, out: Collector[TopicAndValue]): Unit = {
        value.topic match {
          case "basead" | "basewebsite" | "membervip" => ctx.output(sideOutHbaseTag, value)
          case _ => out.collect(value)
        }
      }
    })
    //侧输出流得到 需要写入hbase的数据
    result.getSideOutput(sideOutHbaseTag).addSink(new DwdHbaseSink)
    //    //    //事实表数据写入第二层kafka
    result.addSink(new FlinkKafkaProducer010[TopicAndValue](GlobalConfig.BOOTSTRAP_SERVERS, "", new DwdKafkaProducerSerializationSchema))
    env.execute()
  }
}

11.6.3编写hbase sink

(1)维度表为广告基础信息表、Vip级别基础表、网站信息基础表,三表数据分别写入对用hbase表中

package com.atguigu.education.etl

import com.atguigu.education.model.{baseAd, baseViplevel, baseWebSite, GlobalConfig, TopicAndValue}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HbaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes

class DwdHbaseSink extends RichSinkFunction[TopicAndValue] {
  var connection: Connection = _

  //打开hbase连接
  override def open(parameters: Configuration): Unit = {
    val conf = HbaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", GlobalConfig.Hbase_ZOOKEEPER_QUORUM)
    conf.set("hbase.zookeeper.property.clientPort", GlobalConfig.Hbase_ZOOKEEPER_PROPERTY_CLIENTPORT)
    connection = ConnectionFactory.createConnection(conf)
  }

  //写入hbase
  override def invoke(value: TopicAndValue, context: SinkFunction.Context[_]): Unit = {
    value.topic match {
      case "basewebsite" => invokebaseWebSite("education:dwd_basewebsite", value)
      case "basead" => invokebasebaseAd("education:dwd_basead", value)
      case _ => invokebaseVipLevel("education:dwd_membervip", value)
    }
  }


  //关闭hbase连接
  override def close(): Unit = {
    super.close()
    connection.close()
  }

  def invokebaseWebSite(tableNamae: String, value: TopicAndValue): Unit = {
    val gson = new Gson()
    val basewebsite = gson.fromJson(value.value, classOf[baseWebSite])
    val table = connection.getTable(TableName.valueOf(tableNamae))
    val put = new Put(Bytes.toBytes(basewebsite.siteid + "_" + basewebsite.dn))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("siteid"), Bytes.toBytes(basewebsite.siteid))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sitename"), Bytes.toBytes(basewebsite.sitename))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("siteurl"), Bytes.toBytes(basewebsite.siteurl))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("delete"), Bytes.toBytes(basewebsite.delete))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("createtime"), Bytes.toBytes(basewebsite.createtime))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("creator"), Bytes.toBytes(basewebsite.creator))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("dn"), Bytes.toBytes(basewebsite.dn))
    table.put(put)
    table.close()
  }

  def invokebasebaseAd(tableName: String, value: TopicAndValue) = {
    val gson = new Gson()
    val basead = gson.fromJson(value.value, classOf[baseAd])
    val table = connection.getTable(TableName.valueOf(tableName))
    val put = new Put(Bytes.toBytes(basead.adid + "_" + basead.dn))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("adid"), Bytes.toBytes(basead.adid))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("adname"), Bytes.toBytes(basead.adname))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("dn"), Bytes.toBytes(basead.dn))
    table.put(put)
    table.close()
  }

  def invokebaseVipLevel(tableName: String, value: TopicAndValue) = {
    val gson = new Gson()
    val baseViplevel = gson.fromJson(value.value, classOf[baseViplevel])
    val table = connection.getTable(TableName.valueOf(tableName))
    val put = new Put(Bytes.toBytes(baseViplevel.vip_id + "_" + baseViplevel.dn))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("vip_id"), Bytes.toBytes(baseViplevel.vip_id))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("vip_level"), Bytes.toBytes(baseViplevel.vip_level))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("start_time"), Bytes.toBytes(baseViplevel.start_time))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("end_time"), Bytes.toBytes(baseViplevel.end_time))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("last_modify_time"), Bytes.toBytes(baseViplevel.last_modify_time))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("max_free"), Bytes.toBytes(baseViplevel.max_free))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("min_free"), Bytes.toBytes(baseViplevel.min_free))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("next_level"), Bytes.toBytes(baseViplevel.next_level))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("operator"), Bytes.toBytes(baseViplevel.operator))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("dn"), Bytes.toBytes(baseViplevel.dn))
    table.put(put)
    table.close()
  }
}

11.6.4编写kafka sink

(1)重写TargetTopic方法,此方法就是要发往的topic名称,拼上前缀各ods层topic发往对应dwd层topic

package com.atguigu.education.model

import java.nio.charset.Charset

import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema


class DwdKafkaProducerSerializationSchema extends KeyedSerializationSchema[TopicAndValue] {
  val serialVersionUID = 1351665280744549933L;

  override def serializeKey(element: TopicAndValue): Array[Byte] = null

  override def serializevalue(element: TopicAndValue): Array[Byte] = {
    element.value.getBytes(Charset.forName("utf-8"))
  }

  override def getTargetTopic(element: TopicAndValue): String = {
    "dwd" + element.topic
  }
}

11.6.5编写dim逻辑

(1)dim层逻辑,flink程序监听dwd层topic,也就是对应的三张事实表数据,用户基础表、用户支付金额表、用户注册跳转地址表,形成三个流进行三流join,以用户表为主表使用flink co group实现left join。三流join完之后再使用flink async异步io关联维度表数据,最终写入hbase宽表

package com.atguigu.education.etl

import java.lang
import java.time.{LocalDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter
import java.util.Properties

import akka.remote.WireFormats.TimeUnit
import com.alibaba.fastjson.{JSON, JSONObject}
import com.atguigu.education.model._
import com.atguigu.education.util.ParseJsonData
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.types.parser.LongParser
import org.apache.flink.util.Collector

import scala.collection.mutable

//flink run -m yarn-cluster -ynm dimetl -p 12 -ys 4 -yjm 1024 -ytm 2048m -d -c com.atguigu.education.etl.DImJoinData  -yqu flink ./education-flink-online-1.0-SNAPSHOT-jar-with-dependencies.jar --group.id test --bootstrap.servers hadoop101:9092,hadoop102:9092,hadoop103:9092
//--bootstrap.servers hadoop101:9092,hadoop102:9092,hadoop103:9092 --group.id test

object DImJoinData {
  val BOOTSTRAP_SERVERS = "bootstrap.servers"
  val GROUP_ID = "group.id"
  val RETRIES = "retries"
  val TOPIC = "topic"

  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    env.getConfig.setGlobalJobParameters(params)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置时间模式为事件时间
    //checkpoint设置
    env.enableCheckpointing(60000l) //1分钟做一次checkpoint
    val checkpointConfig = env.getCheckpointConfig
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //仅仅一次
    checkpointConfig.setMinPauseBetweenCheckpoints(30000l) //设置checkpoint间隔时间30秒
    checkpointConfig.setCheckpointTimeout(10000l) //设置checkpoint超时时间
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //cancel时保留checkpoint
    //设置statebackend 为rockdb
    //        val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://mycluster/flink/checkpoint")
    //        env.setStateBackend(stateBackend)

    //设置重启策略   重启3次 间隔10秒
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.seconds(10)))

    val consumerProps = new Properties()
    consumerProps.setProperty(BOOTSTRAP_SERVERS, params.get(BOOTSTRAP_SERVERS))
    consumerProps.setProperty(GROUP_ID, params.get(GROUP_ID))
    val dwdmemberSource = new FlinkKafkaConsumer010[DwdMember]("dwdmember", new DwdMemberDeserializationSchema, consumerProps)
    val dwdmemberpaymoneySource = new FlinkKafkaConsumer010[DwdMemberPayMoney]("dwdmemberpaymoney", new DwdMemberPayMoneyDeserializationSchema, consumerProps)
    val dwdmemberregtypeSource = new FlinkKafkaConsumer010[DwdMemberRegtype]("dwdmemberregtype", new DwdMemberRegtypeDeserializationSchema, consumerProps)
    dwdmemberSource.setStartFromEarliest()
    dwdmemberpaymoneySource.setStartFromEarliest()
    dwdmemberregtypeSource.setStartFromEarliest()

    //注册时间作为 事件时间 水位线设置为10秒
    val dwdmemberStream = env.addSource(dwdmemberSource).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[DwdMember](Time.seconds(10)) {
      override def extractTimestamp(element: DwdMember): Long = {
        element.register.toLong
      }
    })

    //创建时间作为 事件时间  水位线设置10秒
    val dwdmemberpaymoneyStream = env.addSource(dwdmemberpaymoneySource).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[DwdMemberPayMoney](Time.seconds(10)) {
      override def extractTimestamp(element: DwdMemberPayMoney): Long = {
        element.createtime.toLong
      }
    })
    //
    val dwdmemberregtypeStream = env.addSource(dwdmemberregtypeSource).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[DwdMemberRegtype](Time.seconds(10)) {
      override def extractTimestamp(element: DwdMemberRegtype): Long = {
        element.createtime.toLong
      }
    })

    //    用户表先关联注册表 以用户表为主表 用cogroup 实现left join
    val dwdmemberLeftJoinRegtyeStream = dwdmemberStream.coGroup(dwdmemberregtypeStream)
      .where(item => item.uid + "_" + item.dn).equalTo(item => item.uid + "_" + item.dn)
      //      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
      //      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
      .window(TumblingEventTimeWindows.of(Time.minutes(10)))
      .trigger(CountTrigger.of(1))
      .apply(new MemberLeftJoinRegtype)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(10)) {
        override def extractTimestamp(element: String): Long = {
          val register = ParseJsonData.getJsonData(element).getString("register")
          register.toLong
        }
      })
    //再根据用户信息跟消费金额进行关联 用户表为主表进行 left join 根据uid和dn进行join
    val resultStream = dwdmemberLeftJoinRegtyeStream.coGroup(dwdmemberpaymoneyStream)
      .where(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val uid = jsonObject.getString("uid")
        val dn = jsonObject.getString("dn")
        uid + "_" + dn
      }).equalTo(item => item.uid + "_" + item.dn)
      .window(TumblingEventTimeWindows.of(Time.minutes(10)))
      .trigger(CountTrigger.of(1))
      .apply(new resultStreamCoGroupFunction)
       val resultDStream = AsyncDataStream.unorderedWait(resultStream, new DimHbaseAsyncFunction, 10l, java.util.concurrent.TimeUnit.MINUTES, 12)
    //    val resultDStream = AsyncDataStream.unorderedWait(resultStream, new DimKuduAsyncFunction, 10l, java.util.concurrent.TimeUnit.MINUTES, 12)
    resultDStream.print()
    resultDStream.addSink(new DwsMemberKuduSink)
    env.execute()
  }

  class MemberLeftJoinRegtype extends CoGroupFunction[DwdMember, DwdMemberRegtype, String] {
    override def coGroup(first: lang.Iterable[DwdMember], second: lang.Iterable[DwdMemberRegtype], out: Collector[String]): Unit = {
      var bl = false
      val leftIterator = first.iterator()
      val rightIterator = second.iterator()
      while (leftIterator.hasNext) {
        val dwdMember = leftIterator.next()
        val jsonObject = new JSonObject()
        jsonObject.put("uid", dwdMember.uid)
        jsonObject.put("ad_id", dwdMember.ad_id)
        jsonObject.put("birthday", dwdMember.birthday)
        jsonObject.put("email", dwdMember.email)
        jsonObject.put("fullname", dwdMember.fullname)
        jsonObject.put("iconurl", dwdMember.iconurl)
        jsonObject.put("lastlogin", dwdMember.lastlogin)
        jsonObject.put("mailaddr", dwdMember.mailaddr)
        jsonObject.put("memberlevel", dwdMember.memberlevel)
        jsonObject.put("password", dwdMember.password)
        jsonObject.put("phone", dwdMember.phone)
        jsonObject.put("qq", dwdMember.qq)
        jsonObject.put("register", dwdMember.register)
        jsonObject.put("regupdatetime", dwdMember.regupdatetime)
        jsonObject.put("unitname", dwdMember.unitname)
        jsonObject.put("userip", dwdMember.userip)
        jsonObject.put("zipcode", dwdMember.zipcode)
        jsonObject.put("dt", dwdMember.dt)
        jsonObject.put("dn", dwdMember.dn)
        while (rightIterator.hasNext) {
          val dwdMemberRegtype = rightIterator.next()
          jsonObject.put("appkey", dwdMemberRegtype.appkey)
          jsonObject.put("appregurl", dwdMemberRegtype.appregurl)
          jsonObject.put("bdp_uuid", dwdMemberRegtype.bdp_uuid)
          jsonObject.put("createtime", dwdMemberRegtype.createtime)
          jsonObject.put("isranreg", dwdMemberRegtype.isranreg)
          jsonObject.put("regsource", dwdMemberRegtype.regsource)
          jsonObject.put("websiteid", dwdMemberRegtype.websiteid)
          bl = true
          out.collect(jsonObject.toJSONString)
        }
        if (!bl) {
          jsonObject.put("appkey", "")
          jsonObject.put("appregurl", "")
          jsonObject.put("bdp_uuid", "")
          jsonObject.put("createtime", "")
          jsonObject.put("isranreg", "")
          jsonObject.put("regsource", "")
          jsonObject.put("websiteid", "")
          out.collect(jsonObject.toJSONString)
        }
      }
    }
  }

  class resultStreamCoGroupFunction extends CoGroupFunction[String, DwdMemberPayMoney, String] {
    override def coGroup(first: lang.Iterable[String], second: lang.Iterable[DwdMemberPayMoney], out: Collector[String]): Unit = {
      var bl = false
      val leftIterator = first.iterator()
      val rightIterator = second.iterator()
      while (leftIterator.hasNext) {
        val jsonObject = ParseJsonData.getJsonData(leftIterator.next())
        while (rightIterator.hasNext) {
          val dwdMemberPayMoney = rightIterator.next()
          jsonObject.put("paymoney", dwdMemberPayMoney.paymoney)
          jsonObject.put("siteid", dwdMemberPayMoney.siteid)
          jsonObject.put("vip_id", dwdMemberPayMoney.vip_id)
          bl = true
          out.collect(jsonObject.toJSONString)
        }
        if (!bl) {
          jsonObject.put("paymoney", "")
          jsonObject.put("siteid", "")
          jsonObject.put("vip_id", "")
          out.collect(jsonObject.toJSONString)
        }
      }
    }
  }

}

11.6.6编写async io关联hbase维度表表

(1)使用线程池,异步操作数据库,同时使用guava缓存功能,减少对hbase的查询操作,关联维度表数据后,再写入hbase宽表

package com.atguigu.education.etl

import java.math.BigInteger
import java.security.MessageDigest
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}

import com.alibaba.fastjson.JSONObject
import com.atguigu.education.model.{GlobalConfig, ResultMode}
import com.atguigu.education.util.ParseJsonData
import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
import org.apache.flink.util.ExecutorUtils
import org.apache.hadoop.hbase.{CellUtil, HbaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put}
import org.apache.hadoop.hbase.util.Bytes


class DimHbaseAsyncFunction extends RichAsyncFunction[String, String] {

  var executorService: ExecutorService = _
  var cache: Cache[String, String] = _

  //使用opentsdb 的 异步客户端  http://opentsdb.github.io/asynchbase/javadoc/index.html
  override def open(parameters: Configuration): Unit = {
    //    val hbase_config = new Config()
    //    hbase_config.overrideConfig("hbase.zookeeper.quorum", GlobalConfig.Hbase_ZOOKEEPER_QUORUM)
    //    hbase_config.overrideConfig("hbase.zookeeper.property.clientPort", GlobalConfig.Hbase_ZOOKEEPER_PROPERTY_CLIENTPORT)
    //    //创建连接
    //    hbaseClient = new HbaseClient(hbase_config)
    //初始化线程池 12个线程
    executorService = Executors.newFixedThreadPool(12)
    //初始化缓存
    cache = CacheBuilder.newBuilder()
      .concurrencyLevel(12) //设置并发级别 允许12个线程同时访问
      .expireAfterAccess(2, TimeUnit.HOURS) //设置缓存 2小时 过期
      .maximumSize(10000) //设置缓存大小
      .build()
  }

  override def close(): Unit = {
    ExecutorUtils.gracefulShutdown(100, TimeUnit.MILLISECONDS, executorService);
  }


  override def timeout(input: String, resultFuture: ResultFuture[String]): Unit = {
    resultFuture.complete(Array("timeout:" + input))
  }

  override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {

    executorService.submit(new Runnable {
      override def run(): Unit = {
        try {
          val conf = HbaseConfiguration.create()
          conf.set("hbase.zookeeper.quorum", GlobalConfig.Hbase_ZOOKEEPER_QUORUM)
          conf.set("hbase.zookeeper.property.clientPort", GlobalConfig.Hbase_ZOOKEEPER_PROPERTY_CLIENTPORT)
          val connection = ConnectionFactory.createConnection(conf)
          val resultJsonObject = getHbaseJoinData(input, connection, cache)
          resultFuture.complete(Array(resultJsonObject.toJSONString))
          writeDataToHbase(connection, resultJsonObject)
        } catch {
          case e: Exception => resultFuture.complete(Array("error:" + e.printStackTrace()))
        }
      }
    })
  }

  
  def getHbaseJoinData(input: String, connection: Connection, cache: Cache[String, String]): JSonObject = {
    val jsonObject = ParseJsonData.getJsonData(input)
    //通过adid  siteid vipid 去hbase维度表查询数据
    val ad_id = jsonObject.getString("ad_id")
    val siteid = jsonObject.getString("siteid")
    val vip_id = jsonObject.getString("vip_id")
    val dn = jsonObject.getString("dn")
    var adname: String = ""
    var sitename: String = ""
    var siteurl: String = ""
    var delete: String = ""
    var site_createtime: String = ""
    var site_creator: String = ""
    var vip_level: String = ""
    var vip_start_time: String = ""
    var vip_end_tiem: String = ""
    var last_modify_time = ""
    var max_free = ""
    var min_free = ""
    var next_level = ""
    var operator = ""
    //查询广告关联表 adname 先去缓存中查询 如果缓存中没有再去hbase中查询
    adname = cache.getIfPresent("adname:" + ad_id + "_" + dn)
    if (adname == null || "".equals(adname)) {
      val table = connection.getTable(TableName.valueOf("education:dwd_basead"))
      val get = new Get(Bytes.toBytes(ad_id + "_" + dn)).addColumn(Bytes.toBytes("info"), Bytes.toBytes("adname"))
      val result = table.get(get)
      for (cell <- result.rawCells()) {
        adname = Bytes.toString(CellUtil.clonevalue(cell))
      }
      cache.put("adname:" + ad_id + "_" + dn, adname) //放入缓存
      table.close()
    }
    //查询网站关联表 sitename stiteurl等信息
    if (!"".equals(siteid)) {
      val siteDetail = cache.getIfPresent("siteDetail:" + siteid + "_" + dn) //先从缓存中取
      if (siteDetail == null || "".equals(siteDetail)) {
        val table = connection.getTable(TableName.valueOf("education:dwd_basewebsite"))
        val get = new Get(Bytes.toBytes(siteid + "_" + dn)).addFamily(Bytes.toBytes("info"))
        val result = table.get(get)
        for (cell <- result.rawCells()) {
          val colName = Bytes.toString(CellUtil.cloneQualifier(cell))
          val value = Bytes.toString(CellUtil.clonevalue(cell))
          colName match {
            case "sitename" => sitename = value
            case "siteurl" => siteurl = value
            case "delete" => delete = value
            case "createtime" => site_createtime = value
            case "creator" => site_creator = value
            case _ => null
          }
        }
        //将查询到的数据拼装成json格式 存入缓存
        val jsonObject = new JSonObject()
        jsonObject.put("sitename", sitename)
        jsonObject.put("siteurl", siteurl)
        jsonObject.put("delete", delete)
        jsonObject.put("site_createtime", site_createtime)
        jsonObject.put("site_creator", site_creator)
        cache.put("siteDetail:" + siteid + "_" + dn, jsonObject.toJSONString)
        table.close()
      } else {
        //如果缓存中有数据 则解析缓存中的json数据
        val jsonObject = ParseJsonData.getJsonData(siteDetail)
        sitename = jsonObject.getString("sitename")
        siteurl = jsonObject.getString("siteurl")
        delete = jsonObject.getString("delete")
        site_createtime = jsonObject.getString("site_createtime")
        site_creator = jsonObject.getString("site_creator")
      }
    }
    //vip表关联的数据
    if (!"".equals(vip_id)) {
      val vipDetail = cache.getIfPresent("vipDetail:" + vip_id + "_" + dn) //先查询缓存
      if (vipDetail == null || "".equals(vipDetail)) {
        val table = connection.getTable(TableName.valueOf("education:dwd_membervip"))
        val get = new Get(Bytes.toBytes(vip_id + "_" + dn)).addFamily(Bytes.toBytes("info"))
        val result = table.get(get)
        for (cell <- result.rawCells()) {
          val colName = Bytes.toString(CellUtil.cloneQualifier(cell))
          val value = Bytes.toString(CellUtil.clonevalue(cell))
          colName match {
            case "vip_level" => vip_level = value
            case "vip_start_time" => vip_start_time = value
            case "vip_end_time" => vip_end_tiem = value
            case "last_modify_time" => last_modify_time = value
            case "max_free" => max_free = value
            case "min_free" => min_free = value
            case "next_level" => next_level = value
            case "operator" => operator = value
            case _ => null
          }
        }
        //将查询到的数据拼装成json 存入缓存
        val jsonObject = new JSonObject()
        jsonObject.put("vip_level", vip_level)
        jsonObject.put("vip_start_time", vip_start_time)
        jsonObject.put("vip_end_tiem", vip_end_tiem)
        jsonObject.put("last_modify_time", last_modify_time)
        jsonObject.put("max_free", max_free)
        jsonObject.put("min_free", min_free)
        jsonObject.put("next_level", next_level)
        jsonObject.put("operator", operator)
        cache.put("vipDetail:" + vip_id + "_" + dn, jsonObject.toJSONString)
        table.close()
      } else {
        //如果缓存中有值 就解析缓存中的数据
        val jsonObject = ParseJsonData.getJsonData(vipDetail)
        vip_level = jsonObject.getString("vip_level")
        vip_start_time = jsonObject.getString("vip_start_time")
        vip_end_tiem = jsonObject.getString("vip_end_tiem")
        last_modify_time = jsonObject.getString("last_modify_time")
        max_free = jsonObject.getString("max_free")
        min_free = jsonObject.getString("min_free")
        next_level = jsonObject.getString("next_level")
        operator = jsonObject.getString("operator")
      }
    }
    jsonObject.put("adname", adname)
    jsonObject.put("sitename", sitename)
    jsonObject.put("siteurl", siteurl)
    jsonObject.put("delete", delete)
    jsonObject.put("site_createtime", site_createtime)
    jsonObject.put("site_creator", site_creator)
    jsonObject.put("vip_level", vip_level)
    jsonObject.put("vip_start_time", vip_start_time)
    jsonObject.put("vip_end_tiem", vip_end_tiem)
    jsonObject.put("last_modify_time", last_modify_time)
    jsonObject.put("max_free", max_free)
    jsonObject.put("min_free", min_free)
    jsonObject.put("next_level", next_level)
    jsonObject.put("operator", operator)
    jsonObject
  }

  
  def writeDataToHbase(connection: Connection, resultJsonObject: JSONObject) = {
    val table = connection.getTable(TableName.valueOf("education:dim_member"))
    //rowkey  substring(md5(uid),0,5)+uid+dn
    val uid = resultJsonObject.getString("uid")
    val dn = resultJsonObject.getString("dn")
    val rowkey = generateHash(uid).substring(0, 5) + uid + dn
    val put = new Put(Bytes.toBytes(rowkey))
    val keySet = resultJsonObject.keySet().toArray
    for (key <- keySet) {
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(key.toString),
        Bytes.toBytes(resultJsonObject.getString(key.toString)))
    }
    table.put(put)
    table.close()
  }

  
  def generateHash(input: String): String = {
    try {
      if (input == null) {
        null
      }
      val md = MessageDigest.getInstance("MD5")
      md.update(input.getBytes());
      val digest = md.digest();
      val bi = new BigInteger(1, digest);
      var hashText = bi.toString(16);
      while (hashText.length() < 32) {
        hashText = "0" + hashText;
      }
      hashText
    } catch {
      case e: Exception => e.printStackTrace(); null
    }
  }
}

11.6.7架构图

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/679980.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号