栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

starocks集群 实时日志分析

starocks集群 实时日志分析

starocks 实时日志分析

    日志实时同步

    技术路径

filebeat+kafka+starrocks

filebeat 部署

# 下载安装包
https://www.elastic.co/cn/downloads/beats/filebeat
# tar解压
tar -zvxf filebeat-8.0.0-linux-x86_64.tar.gz

配置文件

## 新建配置文件
cp filebeat.yml filebeat-kafka.yml
vim filebeat-kafka.yml	
## 配置文件设置如下
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

# filestream is an input for collecting log messages from files.
- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /opt/StarRocks-1.19.6/fe/log/fe.audit.log*
    
    
output.kafka:
  # initial brokers for reading cluster metadata
   hosts: ["kafka1:9092", "kafka1:9092", "kafka1:9092"]
   topic: starocks_log_prd
   partition.round_robin: 
     reachable_only: true
     keep_alive: 10s

启动脚本,抽取日志

 nohup ./filebeat -c filebeat-kafka.yml &

查看kafka数据

{
  "@timestamp": "2022-03-02T06:44:40.814Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.0.0"
  },
  "input": {
    "type": "log"
  },
  "agent": {
    "version": "8.0.0",
    "ephemeral_id": "a2187d49-ce13-4d83-8a50-49a86fc183b9",
    "id": "dfa6ed24-0493-4150-9af6-d849533b2561",
    "name": "p7bdsrdb104",
    "type": "filebeat"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "p7bdsrdb104",
    "architecture": "x86_64",
    "os": {
      "codename": "Maipo",
      "type": "linux",
      "platform": "rhel",
      "version": "7.8 (Maipo)",
      "family": "redhat",
      "name": "Red Hat Enterprise Linux Server",
      "kernel": "3.10.0-1127.el7.x86_64"
    },
    "id": "266664a41f7840a8a45060a4f42aee67",
    "containerized": false,
    "ip": [
      "172.25.5.104",
      "fe80::3a68:ddff:fe60:5e38"
    ],
    "mac": [
      "38:68:dd:60:5e:38",
      "80:61:5f:11:26:7c",
      "80:61:5f:11:26:7d",
      "38:68:dd:60:5e:39",
      "38:68:dd:60:5e:38",
      "80:61:5f:11:26:7e",
      "80:61:5f:11:26:7f",
      "38:68:dd:60:5e:3b",
      "38:68:dd:60:5e:38"
    ],
    "hostname": "p7bdsrdb104"
  },
  "log": {
    "offset": 1657558,
    "file": {
      "path": "/opt/StarRocks-2.1.0/fe/log/fe.audit.log.20220228-1"
    }
  },
  "message": "2022-02-28 18:04:33,955 [query] |Clientu003d1|Useru003ddefault_cluster:apptwork|Dbu003ddefault_cluster:information_schema|Stateu003dERR|Timeu003d1|ScanBytesu003d0|ScanRowsu003d0|ReturnRowsu003d0|StmtIdu003d2451|QueryIdu003dd42aa07f-987d-11ec-a5cb-3868dd605e38|IsQueryu003dfalse|feIpu003d172.25.5.104|Stmtu003dSELECt  @@session.auto_increment_increment AS auto_increment_increment, @@character_set_client AS character_set_client, @@character_set_connection AS character_set_connection, @@character_set_results AS character_set_results, @@character_set_server AS character_set_server, @@collation_server AS collation_server, @@collation_connection AS collation_connection, @@init_connect AS init_connect, @@interactive_timeout AS interactive_timeout, @@license AS license, @@lower_case_table_names AS lower_case_table_names, @@max_allowed_packet AS max_allowed_packet, @@net_buffer_length AS net_buffer_length, @@net_write_timeout AS net_write_timeout, @@performance_schema AS performance_schema, @@query_cache_size AS query_cache_size, @@query_cache_type AS query_cache_type, @@sql_mode AS sql_mode, @@system_time_zone AS system_time_zone, @@time_zone AS time_zone, @@transaction_isolation AS transaction_isolation, @@wait_timeout AS wait_timeout|Digestu003d"
}

    实时接入starrocks

    构建starocks表

    -- starocks日志表,保留十天日志数据
    CREATE table app.starocks_sql_log(
      `message` String NOT null,
      `@timestamp` datetime  COMMENT "数据写入时间"
    ) ENGINE=OLAP 
    DUPLICATE  KEY(message)
    COMMENT "OLAP"
    PARTITION BY RANGE(`@timestamp`)
    (
    PARTITION p20220305 VALUES [('2022-03-05 00:00:00'), ('2022-03-06 00:00:00')))
    DISTRIBUTED BY HASH(message) BUCKETS 8 
    PROPERTIES (
    "replication_num" = "1",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.time_zone" = "Asia/Shanghai",
    "dynamic_partition.start" = "-10",
    "dynamic_partition.end" = "3",
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "8",
    "in_memory" = "false",
    "storage_format" = "DEFAULT"
    );
    
    -- 批量新增分区
    ALTER table starocks_sql_log set ("dynamic_partition.enable"="false")
    ALTER table starocks_sql_log ADD PARTITIONS START ("2022-03-01") END ("2022-03-05") EVERY (interval 1 day);
    ALTER table starocks_sql_log set ("dynamic_partition.enable"="true")
    
    

    创建实时任务

    -- 创建实时任务
    CREATE ROUTINE LOAD app.routine_load_starocks_sql_log_job ON starocks_sql_log
    COLUMNS TERMINATED BY ",",
    COLUMNS (message,`@timestamp`)
    PROPERTIES
    (
        "format"="json",
        "desired_concurrent_number"="6",
        "max_error_number"="1000"
    )
    FROM KAFKA
    (
        "kafka_broker_list"= "kafka1:9092,kafka2:9092,kafka3:9092",
        "kafka_topic" = "starocks_log"
    );
    
    --  查看实时状态
    SHOW ALL ROUTINE LOAD;
    -- 重启任务
    RESUME ROUTINE LOAD FOR routine_load_starocks_sql_log_job;
    -- 暂停任务
    PAUSE ROUTINE LOAD FOR routine_load_starocks_sql_log_job;
    -- 停止任务
    STOP ROUTINE LOAD FOR routine_load_starocks_sql_log_job;
    
    

    创建视图,清洗数据

    -- 创建视图
    CREATE  VIEW app.starocks_sql_log_view as
    SELECt  
    SUBSTRING(split(message,'|')[1],1,19) as logtime,
    SUBSTRING(split(message,'|')[1],24) as sqltype,
    regexp_replace(split(message,'|')[2],'Client=','') as client,
    regexp_replace(split(message,'|')[3],'User=','') as username,
    regexp_replace(split(message,'|')[4],'Db=','') as db,
    regexp_replace(split(message,'|')[5],'State=','') as state,
    regexp_replace(split(message,'|')[6],'Time=','') as executetime,
    regexp_replace(split(message,'|')[8],'ScanRows=','') as scanrows,
    regexp_replace(split(message,'|')[9],'ReturnRows=','') as ReturnRows,
    regexp_replace(split(message,'|')[12],'IsQuery=','') as isquery,
    regexp_replace(split(message,'|')[13],'feIp=','') as feip,
    regexp_replace(split(message,'|')[14],'Stmt=','') as stmt
    from starocks_sql_log;
    
    -- 查询视图
    SELECt  count(*)  from app.starocks_sql_log_view;
    

    数据分析

    数据结果
    logtime|sqltype|client|username|db|state|executetime|scanrows|ReturnRows|isquery|feip|stmt
    2022-03-02 19:35:16| [query] |10.110.20.67:64741|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
    2022-03-02 19:58:26| [query] |10.110.20.67:65008|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET NAMES utf8mb4
    2022-03-02 19:58:26| [query] |10.110.20.67:65008|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
    2022-03-02 19:59:05| [query] |10.110.20.67:65013|root|default_cluster:example_db|OK|1|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
    2022-03-02 19:59:50| [query] |10.110.20.67:65017|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’
    2022-03-02 20:00:40| [query] |10.110.20.67:65020|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET NAMES utf8mb4
    2022-03-02 20:00:40| [query] |10.110.20.67:65020|root|default_cluster:example_db|OK|0|0|0|false|10.110.37.223|SET sql_mode=‘STRICT_TRANS_TABLES’

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753088.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号