日志实时同步
技术路径
filebeat+kafka+starrocks
filebeat 部署
# 下载安装包 https://www.elastic.co/cn/downloads/beats/filebeat # tar解压 tar -zvxf filebeat-8.0.0-linux-x86_64.tar.gz
配置文件
## 新建配置文件 cp filebeat.yml filebeat-kafka.yml vim filebeat-kafka.yml ## 配置文件设置如下
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
# filestream is an input for collecting log messages from files.
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /opt/StarRocks-1.19.6/fe/log/fe.audit.log*
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka1:9092", "kafka1:9092", "kafka1:9092"]
topic: starocks_log_prd
partition.round_robin:
reachable_only: true
keep_alive: 10s
启动脚本,抽取日志
nohup ./filebeat -c filebeat-kafka.yml &
查看kafka数据
{
"@timestamp": "2022-03-02T06:44:40.814Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "8.0.0"
},
"input": {
"type": "log"
},
"agent": {
"version": "8.0.0",
"ephemeral_id": "a2187d49-ce13-4d83-8a50-49a86fc183b9",
"id": "dfa6ed24-0493-4150-9af6-d849533b2561",
"name": "p7bdsrdb104",
"type": "filebeat"
},
"ecs": {
"version": "8.0.0"
},
"host": {
"name": "p7bdsrdb104",
"architecture": "x86_64",
"os": {
"codename": "Maipo",
"type": "linux",
"platform": "rhel",
"version": "7.8 (Maipo)",
"family": "redhat",
"name": "Red Hat Enterprise Linux Server",
"kernel": "3.10.0-1127.el7.x86_64"
},
"id": "266664a41f7840a8a45060a4f42aee67",
"containerized": false,
"ip": [
"172.25.5.104",
"fe80::3a68:ddff:fe60:5e38"
],
"mac": [
"38:68:dd:60:5e:38",
"80:61:5f:11:26:7c",
"80:61:5f:11:26:7d",
"38:68:dd:60:5e:39",
"38:68:dd:60:5e:38",
"80:61:5f:11:26:7e",
"80:61:5f:11:26:7f",
"38:68:dd:60:5e:3b",
"38:68:dd:60:5e:38"
],
"hostname": "p7bdsrdb104"
},
"log": {
"offset": 1657558,
"file": {
"path": "/opt/StarRocks-2.1.0/fe/log/fe.audit.log.20220228-1"
}
},
"message": "2022-02-28 18:04:33,955 [query] |Clientu003d1|Useru003ddefault_cluster:apptwork|Dbu003ddefault_cluster:information_schema|Stateu003dERR|Timeu003d1|ScanBytesu003d0|ScanRowsu003d0|ReturnRowsu003d0|StmtIdu003d2451|QueryIdu003dd42aa07f-987d-11ec-a5cb-3868dd605e38|IsQueryu003dfalse|feIpu003d172.25.5.104|Stmtu003dSELECt @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@collation_server AS collation_server, @@collation_connection AS collation_connection, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@performance_schema AS performance_schema, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@transaction_isolation AS transaction_isolation, @@wait_timeout AS wait_timeout|Digestu003d"
}
实时接入starrocks
构建starocks表
-- starocks日志表,保留十天日志数据
CREATE table app.starocks_sql_log(
`message` String NOT null,
`@timestamp` datetime COMMENT "数据写入时间"
) ENGINE=OLAP
DUPLICATE KEY(message)
COMMENT "OLAP"
PARTITION BY RANGE(`@timestamp`)
(
PARTITION p20220305 VALUES [('2022-03-05 00:00:00'), ('2022-03-06 00:00:00')))
DISTRIBUTED BY HASH(message) BUCKETS 8
PROPERTIES (
"replication_num" = "1",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-10",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "8",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
-- 批量新增分区
ALTER table starocks_sql_log set ("dynamic_partition.enable"="false")
ALTER table starocks_sql_log ADD PARTITIONS START ("2022-03-01") END ("2022-03-05") EVERY (interval 1 day);
ALTER table starocks_sql_log set ("dynamic_partition.enable"="true")
创建实时任务
-- 创建实时任务
CREATE ROUTINE LOAD app.routine_load_starocks_sql_log_job ON starocks_sql_log
COLUMNS TERMINATED BY ",",
COLUMNS (message,`@timestamp`)
PROPERTIES
(
"format"="json",
"desired_concurrent_number"="6",
"max_error_number"="1000"
)
FROM KAFKA
(
"kafka_broker_list"= "kafka1:9092,kafka2:9092,kafka3:9092",
"kafka_topic" = "starocks_log"
);
-- 查看实时状态
SHOW ALL ROUTINE LOAD;
-- 重启任务
RESUME ROUTINE LOAD FOR routine_load_starocks_sql_log_job;
-- 暂停任务
PAUSE ROUTINE LOAD FOR routine_load_starocks_sql_log_job;
-- 停止任务
STOP ROUTINE LOAD FOR routine_load_starocks_sql_log_job;
创建视图,清洗数据
-- 创建视图 CREATE VIEW app.starocks_sql_log_view as SELECt SUBSTRING(split(message,'|')[1],1,19) as logtime, SUBSTRING(split(message,'|')[1],24) as sqltype, regexp_replace(split(message,'|')[2],'Client=','') as client, regexp_replace(split(message,'|')[3],'User=','') as username, regexp_replace(split(message,'|')[4],'Db=','') as db, regexp_replace(split(message,'|')[5],'State=','') as state, regexp_replace(split(message,'|')[6],'Time=','') as executetime, regexp_replace(split(message,'|')[8],'ScanRows=','') as scanrows, regexp_replace(split(message,'|')[9],'ReturnRows=','') as ReturnRows, regexp_replace(split(message,'|')[12],'IsQuery=','') as isquery, regexp_replace(split(message,'|')[13],'feIp=','') as feip, regexp_replace(split(message,'|')[14],'Stmt=','') as stmt from starocks_sql_log; -- 查询视图 SELECt count(*) from app.starocks_sql_log_view;
数据分析
数据结果
logtime|sqltype|client|username|db|state|executetime|scanrows|ReturnRows|isquery|feip|stmt
2022-03-02 19:35:16| [query] |10.110.20.67:64741|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
2022-03-02 19:58:26| [query] |10.110.20.67:65008|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET NAMES utf8mb4
2022-03-02 19:58:26| [query] |10.110.20.67:65008|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
2022-03-02 19:59:05| [query] |10.110.20.67:65013|root|default_cluster:example_db|OK|1|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
2022-03-02 19:59:50| [query] |10.110.20.67:65017|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
2022-03-02 20:00:40| [query] |10.110.20.67:65020|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET NAMES utf8mb4
2022-03-02 20:00:40| [query] |10.110.20.67:65020|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’



