用户行为统计是常见需求,当用户点击某按钮或者进入某页面的时候将行为日志发送给后端进行运算保存。通过用户行为日志分析可以更好地捕捉用户喜好,开发出让用户喜欢的产品。
时序图 架构图 环境准备- jdk 1.8
- zookeeper版本:3.4.14
- kafka版本:kafka_2.12-2.3.1
- nginx版本:nginx/1.18.0
- 操作系统:CentOS Linux release 7.4.1708 (Core)
- 安装教程
https://www.runoob.com/w3cnote/zookeeper-setup.html
- 修改/etc/hosts,配置zookeeper域名解析
- 修改kafka安装目录的应用配置文件/config/server.properties
- 设置zookeeper地址:
zookeeper.connect=zk1:2181
host设置为使用hostname而不是ip,避免连接缓慢
2. 设置broker.id(在集群中必须唯一)broker.id=x
- 设置日志文件存储位置(也可使用默认位置)
log.dirs=/xxx/xxx
- 配置允许kafka远程访问
advertised.listeners=PLAINTEXT://172.23.x.x:9092
- 其余配置
#禁止自动创建主题 auto.create.topics.enable=false #允许删除主题 delete.topic.enable=true
- 配置文档:
http://kafkadoc.beanmr.com/030_configuration/01_configuration_cn.html#brokerconfigs
- 在kafka安装目录输入以下命令启动kafka
bin/kafka-server-start.sh config/server.properties &
- kafka集群安装好后,在zookeeper安装目录执行以下命令连接zookeeper
./bin/zkCli.sh -server zk1:2181
- 通过zookeeper节点查看kafka状态
命令:ls /brokers/ids
结果:
- 在kafka安装目录执行以下脚本:
bin/kafka-topics.sh --create --zookeeper 172.23.x.x:2181 --replication-factor 1 --partitions 2 --topic usr-log
- 创建成功后,在kafka安装目录输入命令查看topic分片情况
bin/kafka-topics.sh --describe --zookeeper 172.23.x.x:2181 usr-log
topic被存储在2个分区中,并且只有一个leader副本,不做冗余备份。
-
官方文档地址
https://github.com/brg-liuwei/ngx_kafka_module -
安装流程
- 安装插件前需先安装kafka c/c++客户端环境(librdkafka)
- 安装好kafka客户端环境后输入以下命令设置系统全局配置更新
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig
- 安装ngx_kafka_module插件
- 注意事项:
安装插件时,make install会覆盖原有nginx模块,若在已安装的nginx进行操作,执行configure时记得带上原模块,例如:
./configure --prefix=/usr/local/src/nginx_1.18.0 --with-http_ssl_module --add-module=/usr/local/src/ngx_kafka_module
-
官方安装核心步骤图
- 修改nginx.conf文件,配置请求转发至kafka
http {
kafka;
kafka_broker_list 172.23.x.x:9092 172.23.x.x:9092;
server {
listen 80;
server_name localhost;
location /log/usr {
kafka_topic usr-log;
}
}
}
- 官方配置文档
https://github.com/brg-liuwei/ngx_kafka_module - 启动nginx
- 执行命令模拟客户端向nginx发送操作日志
curl -d '{"uid": 1, "action": "2"}' -H 'Content-Type: application/json' http://localhost/log/usr
- 使用kafka终端消费脚本观察日志是否发送至kafka
./kafka-console-consumer.sh --bootstrap-server 172.23.x.x:9092,172.23.x.x:9092 --topic usr-log使用flink计算并且保存结果
- 添加maven依赖
1.10.3 1.18.10 1.2.49 3.5.1 UTF-8 UTF-8 UTF-8 org.apache.flink flink-java ${flink.version} org.apache.flink flink-clients_2.12 ${flink.version} org.apache.flink flink-streaming-java_2.12 ${flink.version} org.apache.flink flink-jdbc_2.12 ${flink.version} provided org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.projectlombok lombok provided ${lombok.version} com.alibaba fastjson ${fastjson.version} log4j log4j 1.2.17 org.slf4j slf4j-log4j12 1.7.25
- 编写flume程序从kafka抓取nginx日志,并保存到数据库
- 编写启动类,拉取kafka消息
// 构建流执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // kafka 配置 Properties props = new Properties(); props.put("bootstrap.servers", "172.23.x.x:9092,172.23.x.x:9092"); props.put("zookeeper.connect", "zk1:2181"); props.put("group.id", "flink"); props.put("client.id", "flink-1"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 props.put("auto.offset.reset", "latest"); props.put("enable.auto.commit", "true"); DataStreamSourcedataStreamSource = env.addSource( new FlinkKafkaConsumer ( "usr-log", new SimpleStringSchema(), props)) //单线程打印,控制台不乱序,不影响结果 .setParallelism(1); //从kafka里读取数据 dataStreamSource.timeWindowAll( Time.seconds(5L) ).apply(new AllWindowFunction , TimeWindow>() { @Override public void apply(TimeWindow window, Iterable iterableValues, Collector - > out) throws Exception {
List
strList = Lists.newArrayList(iterableValues); if ( strList.isEmpty() ){ return; } out.collect(strList); } }).addSink( new JdbcPersistence() );sink 到数据库 env.execute("nginx log analyse running"); - 编写数据持久类,保存统计结果到数据库
public class JdbcPersistence extends RichSinkFunction
- > {
@Override
public void invoke(List
values, Context context) throws Exception { //TODO 解析日志 for (String log : values) { System.out.println(log); } //TODO 构建统计结果 //TODO 入库 } } - 消费结果:



