前言:本文基于springboot分布式系统实现日志的收集与分析,多用于电商项目的秒杀等热点数据。文章中的内容涉及zookeeper(注册中心)、kafka(队列)、Lua语言(日志收集)以及Apache Druid(实时分析)等热门技术。
一、zookeeper安装
注意:安装zookeeper前,需确保已安装jdk1.8_92以上到虚拟机!安装jdk参考上一篇文章 Linux安装jkd1.8。
下载地址:https://zookeeper.apache.org/releases.html
将安装包上传到/usr/local/zookeeper目录下并解压:
[root@localhost apache-zookeeper-3.5.9-bin]# mkdir data [root@localhost apache-zookeeper-3.5.9-bin]# mkdir logs
注意:zookeeper默认加载zoo.cfg文件
重命名:
[root@localhost conf]# pwd /usr/local/zookeeper/apache-zookeeper-3.5.9-bin/conf [root@localhost conf]# cp zoo_sample.cfg zoo.cfg
修改zoo.cfg文件,修改数据和日志位置:
[root@localhost conf]# vim zoo.cfg4、启动zookeeper
[root@localhost bin]# ./zkServer.sh start
[root@localhost bin]# ./zkServer.sh status
二、kafka安装
下载地址:http://kafka.apache.org/downloads
将安装包上传到/usr/local/kafka目录下并解压:
[root@localhost kafka]# tar -xvf kafka_2.13-3.0.0.tgz2、创建日志存放目录
在解压目录/usr/local/kafka/kafka_2.13-3.0.0 下创建:
[root@localhost kafka_2.13-3.0.0]# pwd /usr/local/kafka/kafka_2.13-3.0.0 [root@localhost kafka_2.13-3.0.0]# mkdir logs3、修改配置文件
[root@localhost kafka_2.13-3.0.0]# vim config/server.properties
参数介绍:
- localhost : 只监听本机的地址请求, 客户端也只能用 localhost 来请求
- 127.0.0.1 : 同localhost, 在请求上可能有与区分 , 看client的请求吧 . 客户端也只能用127.0.0.1来请求
- 192.168.0.1 : 建议不要用这个 , 局域网不一定是 192.168 段的.
- 0.0.0.0 : 本机的所有地址都监听 , 包含 localhost , 127.0.0.1, 及不同网卡的所有ip地址 , 都监听 .
- 这个是对外提供的地址 , 当client请求到kafka时, 会分发这个地址.
- 有三个地方用到: 集群内其他的broker,生产者,消费者
- 可以不填 , 不填就默认用 listeners 的地址.
1、启动
[root@localhost kafka_2.13-3.0.0]# bin/kafka-server-start.sh config/server.properties
2、验证
[root@localhost kafka_2.13-3.0.0]# ps -ef |grep kafka
出现如下内容表示启动成功!
kafka快速入门请参考文档:https://kafka.apache.org/quickstart
1)下面来简单创建一个主题(即队列 itemaccess ),为下文中的日志收集做准备:
[root@localhost kafka_2.13-3.0.0]# pwd /usr/local/kafka/kafka_2.13-3.0.0 #创建主题 [root@localhost kafka_2.13-3.0.0]# bin/kafka-topics.sh --bootstrap-server 192.168.8.116:9092 --create --topic itemaccess --partitions 2 --replication-factor 1 #查看已创建主题 [root@localhost kafka_2.13-3.0.0]# bin/kafka-topics.sh --bootstrap-server 192.168.8.116:9092 --list itemaccess
2)启动消费端
[root@localhost kafka_2.13-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.8.116:9092 --topic itemaccess
三、日志收集1、OpenRestry安装
OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。OpenResty 通过lua脚本扩展nginx功能,可提供负载均衡、请求路由、安全认证、服务鉴权、流量控制与日志监控等服务。
关于OpenRestry的学习,可以参考:http://openresty.org/cn/
1.1、下载并解压安装包在目录/tmp下下载安装包:
[root@localhost tmp]# wget https://openresty.org/download/openresty-1.11.2.5.tar.gz
解压:
[root@localhost tmp]# tar -xvf https://openresty.org/download/openresty-1.11.2.5.tar.gz1.2、安装
进入到解压目录进行安装,依次执行以下命令:
[root@localhost tmp]# cd openresty-1.11.2.5/ [root@localhost openresty-1.11.2.5]# ./configure --prefix=/usr/local/openresty --with-luajit --without-http_redis2_module --with-http_stub_status_module --with-http_v2_module --with-http_gzip_static_module --with-http_sub_module [root@localhost openresty-1.11.2.5]# make [root@localhost openresty-1.11.2.5]# make install
安装完成后,软件会安装到/usr/local/openresty,这里面会包含nginx。
配置:
[root@localhost openresty-1.11.2.5]# vim /etc/profile
刷新:
[root@localhost openresty-1.11.2.5]# source /etc/profile1.4、测试是否可用
1)在/usr/local目录下创建web/items目录:
[root@localhost local]# pwd /usr/local [root@localhost local]# mkdir -p web/items
2)下载百度网页到web目录下:
[root@localhost local]# cd web/items [root@localhost items]# pwd /usr/local/web/items [root@localhost items]# wget www.baidu.com
3)修改/usr/local/openresty/nginx/conf/下的nginx.conf配置文件
[root@localhost conf]# pwd /usr/local/openresty/nginx/conf [root@localhost conf]# vim nginx.conf
加入如下内容:
4)启动nginx,并访问测试
[root@localhost conf]# pwd /usr/local/openresty/nginx/conf [root@localhost conf]# nginx
访问:http://192.168.8.116:8081/items/index.html
使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件lua-restry-kafka。关于lua-restry-kafka的下载和使用,可以参考https://github.com/doujiang24/lua-resty-kafka
日志收集流程:
用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,也能完成文件的访问。
将下载好的lua-resty-kafka-master.zip文件上传到/usr/local/openrestry目录下,并解压。
1)安装unzip命令
[root@localhost openresty]# yum install -y unzip
2)解压
[root@localhost openresty]# pwd /usr/local/openresty [root@localhost openresty]# ls bin COPYRIGHT luajit lualib lua-resty-kafka-master.zip nginx openssl111 pcre pod resty.index site zlib [root@localhost openresty]# unzip lua-resty-kafka-master.zip2.2、修改nginx的配置
修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:
[root@localhost conf]# pwd /usr/local/openresty/nginx/conf [root@localhost conf]# vim nginx.conf2.3、日志收集
用户访问页面的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这里要实现Kafka消息生产者。
我们定义一个消息格式:
{
"actime": "2021-11-10 16:25:30",
"uri": "http://192.168.8.116/items/index.html",
"ip": "119.123.33.231",
"token": "Bearer JAVAITCAST"
}
1)生产者脚本
在/usr/local/openresty/nginx/lua目录下创建一个lua脚本items-access.lua:
[root@localhost lua]# pwd /usr/local/openresty/nginx/lua [root@localhost lua]# vim items-access.lua
脚本内容如下:
--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list = {
{ host = "192.168.8.116", port = 9092 }
}
--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
--定义消息内容
local logjson = {}
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer TEST"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")
--发送消息
local offset, err = pro:send("itemaccess", nil, cjson.encode(logjson))
--去掉访问前缀
local uri = ngx.var.uri
uri = string.gsub(uri,"/web","")
--页面跳转
ngx.exec(uri)
2.4、修改nginx配置
[root@localhost conf]# pwd /usr/local/openresty/nginx/conf [root@localhost conf]# vim nginx.conf
重启nginx!
请求地址:http://192.168.8.116:8081/web/items/index.html
四、 Apache Druid日志实时分析
Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的多维 OLAP 分析。 另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。
注:需要JDK:java8(8u92+),同时需要笔记本大约 4 个 CPU 和 16 G的内存来运行!
1、下载安装包下载地址:https://druid.apache.org/downloads.html
快速入门:https://druid.apache.org/docs/latest/tutorials/index.html
1)将文件上传至/usr/local/apache-druid目录下:
2)解压安装
[root@localhost apache-druid]# tar -xvf apache-druid-0.22.0-bin.tar.gz3、修改 Apache Druid自带的zookeeper的端口
问题:在单机部署的时候会和原先安装的zookeeper端口2181冲突,如果两个一起启动,那就就需要修改Druid或者zookeeper端口为2182。
1)查看执行文件
[root@localhost bin]# pwd /usr/local/apache-druid/apache-druid-0.22.0/bin [root@localhost bin]# vim start-micro-quickstart
2)看到显示加载的conf文件是micro-quickstart.conf,直接查看该conf文件
[root@localhost apache-druid-0.22.0]# pwd /usr/local/apache-druid/apache-druid-0.22.0 [root@localhost apache-druid-0.22.0]# vim conf/supervise/single-server/micro-quickstart.conf
发现先去端口验证verify bin/verify-default-ports,然后执行in/run-zk conf。
3) 修改bin/verify-default-ports文件中的端口
[root@localhost apache-druid-0.22.0]# pwd /usr/local/apache-druid/apache-druid-0.22.0 [root@localhost apache-druid-0.22.0]# vim bin/verify-default-ports
将@ports数组中的2181改为2182。
4)将zookeeper中的端口改为2182,修改zoo.cfg文件
[root@localhost zk]# pwd /usr/local/apache-druid/apache-druid-0.22.0/conf/zk [root@localhost zk]# ls jvm.config log4j2.xml zoo.cfg [root@localhost zk]# vim zoo.cfg4、启动单机版Apache Druid
[root@localhost apache-druid-0.22.0]# ./bin/start-micro-quickstart
启动后访问:http://192.168.3.10:8888/,默认的端口是8888
- 点击Load data->Local disk->Connect data
2)导入数据
我们要导入的数据在/tmp/apache-druid-0.21.1/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz(在安装目录下),需要把该文件的相对路径填写到右边表单中,再点击Apply。
3) 解析数据
在上一个步骤上点击Next:Parse data
4) 解析时间
在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的数据没有该列,Apache Druid会自动创建该列!
5) 数据分区设置
点击下一步一直到Partition,Segment granularity选择day
- Segment granularity:分片文件每个segment包含的时间戳范围
- Partitioning type:分区类型
- Max rows per segment:用于分片。确定每个段中的行数。
6) 设置数据源
将默认名称从 更改 wikiticker-2015-09-12-sampled为wikipedia
7)提交数据
参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html
1)加载数据
2)配置Kafka源
topic设置为上文kafka所创建的itemaccess。
3)配置数据源名字
其他的步骤和之前文件导入一样。
查询:
1、导入依赖
org.apache.calcite.avatica avatica-core 1.15.0
2、测试
public static void main(String[] args) throws Exception{
//链接地址
String url = "jdbc:avatica:remote:url=http://192.168.8.116:8082/druid/v2/sql/avatica/";
AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url);
//SQL语句,查询2021-11-10 21:50:30之后的访问uri和访问数量
String sql="SELECt uri,count(*) AS "viewcount" FROM(SELECt * FROM "itemlogs" WHERe __time>'2021-11-10 21:50:30' ORDER BY __time DESC) GROUP BY uri LIMIT 100";
//创建Statment
AvaticaStatement statement = connection.createStatement();
//执行查询
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
//获取uri
String uri = resultSet.getString("uri");
String viewcount = resultSet.getString("viewcount");
System.out.println(uri+"--------->"+viewcount);
}
}
7、拓展
Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代码如下:
[root@k8s-master1 apache-druid-0.21.1]# sed -i "s/Duser.timezone=UTC/Duser.timezone=UTC+8/g" `grep Duser.timezone=UTC -rl ./`
五、springboot整合druid
采用elastic-job定时器来实时查询热点数据。
1、导入依赖2、yaml配置mysql mysql-connector-java 8.0.15 com.github.kuhn-he elastic-job-lite-spring-boot-starter 2.1.5 org.apache.calcite.avatica avatica-core 1.15.0 com.alibaba druid 1.1.12 org.springframework.boot spring-boot-starter-data-redis 2.1.4.RELEASE org.redisson redisson-spring-boot-starter 3.11.0
server:
port: 18084
spring:
application:
name: monitor
datasource:
driver-class-name: org.apache.calcite.avatica.remote.Driver
url: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/
type: com.alibaba.druid.pool.DruidDataSource
cloud:
nacos:
config:
file-extension: yaml
server-addr: 192.168.3.10:8848
discovery:
#Nacos的注册地址
server-addr: 192.168.3.10:8848
redis:
cluster:
nodes:
- 192.168.3.10:7001
- 192.168.3.10:7002
- 192.168.3.10:7003
- 192.168.3.10:7004
- 192.168.3.10:7005
- 192.168.3.10:7006
#elasticjob
elaticjob:
zookeeper:
server-lists: 192.168.3.10:2181
namespace: monitortask
#Druid
druidurl: jdbc:avatica:remote:url=http://192.168.3.10:8082/druid/v2/sql/avatica/
logging:
pattern:
console: "%msg%n"
3、热点数据查询
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.calcite.avatica.AvaticaConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@Component
public class MonitorItemsAccess {
@Value("${druidurl}")
private String druidurl;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DruidDataSource dataSource;
public List loadData() throws Exception{
//获取连接对象
//Connection connection = (AvaticaConnection) DriverManager.getConnection(druidurl);
Connection connection =dataSource.getConnection();
//Statement
Statement statement = connection.createStatement();
//执行查询
ResultSet resultSet = statement.executeQuery(druidSQL());
//解析结果集
List ids = new ArrayList();
while (resultSet.next()){
String uri = resultSet.getString("uri");
uri=uri.replace("/web/items/","").replace(".html","");
ids.add(uri);
}
//关闭资源
resultSet.close();
statement.close();
connection.close();
return ids;
}
public String druidSQL(){
//SQL语句
String prefix="SELECT COUNT(*) AS "viewCount",uri FROM logsitems WHERe __time>=CURRENT_TIMESTAMP - INTERVAL '1' HOUR";
//后部分
String suffix=" GROUP BY uri HAVINg viewCount>2";
//SQL中间部分 AND uri NOT IN ('/web/items/S1235433012716498944.html')
//SKU_S1235433012716498944
String sql = "";
//基于Redis中存的热点商品的key来过滤排除要查询的数据
Set keys = redisTemplate.keys("SKU_*");//所有以SKU_开始的key全部查询出来
if(keys!=null && keys.size()>0){
sql=" AND uri NOT IN (";
for (String key : keys) {
sql+="'/web/items/"+key.substring(4)+".html',";
}
sql=sql.substring(0,sql.length()-1);
sql+=")";
}
return prefix+sql+suffix;
}
}
4、定时查询热点数据
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.elasticjob.lite.annotation.ElasticSimpleJob;
import com.seckill.goods.feign.SkuFeign;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@ElasticSimpleJob(
cron = "1/5 * * * * ?",
jobName = "monitortask",
shardingTotalCount = 1
)
public class MonitorTask implements SimpleJob{
@Autowired
private MonitorItemsAccess monitorItemsAccess;
@SneakyThrows
@Override
public void execute(ShardingContext shardingContext) {
List ids = monitorItemsAccess.loadData();
for (String id : ids) {
System.out.println("热点商品ID:"+id);
}
}
}
自此,完成!!!



