栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Clickhouse

Clickhouse

生生不息,“折腾”不止;Java晋升指北,让天下没有难学的技术;视频教程资源共享,学习不难,坚持不难,坚持学习很难; >>>>


一、MergeTree 系列 1.1 数据TTL

Time To Live,数据存活的时间

在MergeTree中,可以为某个列字段、也可以为整张表设置TTL,如果同时设置了,则会以先到期的为主

# 列级别TTL
CREATE TABLE ttl_table_v1(
    id String,
    create_time DateTime,
    code String TTL create_time + INTERVAL 10 SECOND,
    type UInt8 TTL create_time + INTERVAL 10 SECOND
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(create_time)
ORDER BY id

如果想要修改列字段的TTL,或是为已有字段添加TTL,则可以使用ALTER语句

ALTER TABLE ttl_table_v1 MODIFY COLUMN code String TTL create_time + INTERVAL 1 DAY

目前,ClickHouse没有提供取消列级别、表级别TTL方法

1.1.1 运行机制

TTL运行机制:


如果一张MergeTree表被设置了TTL表达式,那么在写入数据时,会以数据分区为单位,在每个分区目录内生成一个名为ttl.txt的文件

CREATE TABLE ttl_table
(
    `id` String, 
    `createTime` DateTime, 
    `code` String TTL createTime + toIntervalMinute(1)
)
ENGINE = MergeTree
ORDER BY createTime
[root@iZwz9cs3943soqusmlnb7tZ all_1_1_0]# pwd
/var/lib/clickhouse/data/default/ttl_table/all_1_1_0
[root@iZwz9cs3943soqusmlnb7tZ all_1_1_0]# ls
checksums.txt  code.bin  code.mrk2  columns.txt  count.txt  createTime.bin  createTime.mrk2  id.bin  id.mrk2  primary.idx  ttl.txt
[root@iZwz9cs3943soqusmlnb7tZ all_1_1_0]# 
[root@iZwz9cs3943soqusmlnb7tZ all_1_1_0]# cat ttl.txt 
ttl format version: 1
{"columns":[{"name":"code","min":1629725365,"max":1629725365}]}[root@iZwz9cs3943soqusmlnb7tZ all_1_1_0]# 
    在写入数据之后,会在分区目录生成 ttl.txt 文件通过 cat ttl.txt 查看,MergeTree是通过json配置保存了TTL相关信息
      colums:保存列级别TTL信息table:保存表级别TTL信息min:保存当前数据分区内,TTL指定日期字段的最小值+interval max:保存当前数据分区内,TTL指定日期字段的最大值+interval
    只有在MergeTree合并分区时,才会触发删除TTL过期数据的逻辑在选择删除分区时,会使用贪婪算法,它的算法规则则是尽可能找到会最早过期的,同时年纪又是最老的分区如果一个分区内某一列数据因为TTL到期全部被删除
      .bin & .mrk 文件会被删除

TTL默认的合并频率由MergeTree的merge_with_ttl_timeout参数控制,默认86400秒,即1天。它维护的是一个专有的TTL任务队列。有别于MergeTree的常规合并任务,如果这个值被设置的过小,可能会带来性能损耗


TTL默认合并频率

merge_with_ttl_timeout参数控制,默认1天

除了被动触发TTL合并外,也可以使用optimize命令强制触发合并


optimize TABLE table_name

ClickHouse目前虽然没有提供删除TTL声明的方法,但是提供了控制全局TTL合并任务的启停方法


SYSTEM STOP/START TTL MERGES

1.2 多路径存储(了解)

默认策略

无需任何配置,自动保存在config.xml的 路径下 JBOD策略

Just a Bunch of Disks一种轮询策略,每执行一次INSERT或者MERGE,所产生的新分区会轮询写入各个磁盘 HOT/COLD策略

HOT区域使用SSD这类高性能存储媒介,注重存取性能;COLD区域则使用HDD这类高容量存储媒介,注重存取经济性

[root@iZwz9cs3943soqusmlnb7tZ preprocessed_configs]# pwd
/var/lib/clickhouse/preprocessed_configs
[root@iZwz9cs3943soqusmlnb7tZ preprocessed_configs]# ls
config.xml  users.xml
[root@iZwz9cs3943soqusmlnb7tZ preprocessed_configs]# 
/var/lib/clickhouse/
1.3. ReplacingMergeTree
CREATE TABLE replace_table(
    id String,
    code String,
    create_time DateTime
)ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY (id,code)
PRIMARY KEY id

# 也可以设置ver版本号
ENGINE = ReplacingMergeTree(ver)

主键去重(一定程度上)
ORDER BY

ORDER BY是去除重复数据的关键,排序键ORDER BY所声明的表达式是后续作为判断数据是否重复的依据ReplacingMergeTree在去除重复数据时,确实是以ORDER BY排序键为基准的,而不是PRIMARY KEYReplacingMergeTree是以分区为单位删除重复数据的。只有在相同的数据分区内重复的数据才可以被删除,而不同数据分区之间的重复数据依然不能被剔除

使用 order by排序键作为判断重复数据的唯一键只有在合并分区的时候,才会触发删除重复数据的逻辑以数据分区为单位删除数据

当同一分区数据,才会被删除不是同一分区的数据,不会被删除 在进行数据去重时,因为分区内的数据已经基于order by进行了排序,所以能够找到相邻的重复数据去重策略

如果没有设置ver版本号,则保留同一组重复数据中的最后一行如果没有设置ver版本号,则保留同一组重复数据中的ver字段取值最大的一行 1.4 SummingMergeTree

CREATE TABLE summing_table(
    id String,
    city String,
    v1 UInt32,
    v2 Float64,
    create_time DateTime
)ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY (id, city)
PRIMARY KEY id

CREATE TABLE summing_table_nested(
    id String,
    nestMap Nested(
        id UInt32,
        key UInt32,
        val UInt64
    ),
    create_time DateTime
)ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id

合并
ORDER BY

使用 order by排序键作为聚合的条件key只要在合并分区的时候,才会触发汇总合并的逻辑以数据分区为单位

同一数据分区的数据,key相同则合并不同数据分区的数据,不会进行合并 如果在定义引擎时,指定了 colunm 汇总列(非主键字段),则sum汇总这些字段,如果没有指定,则聚合所有非主键的数值类型字段在进行数据汇总时,因为分区内的数据已经基于ORBER BY排序,所以能够找到相邻且拥有相同聚合Key的数据在汇总数据时,同一分区内,相同聚合Key的多行数据会合并成一行

汇总字段会进行SUM计算对于那些非汇总字段,则会使用第一行数据的取值 支持嵌套结构,但列字段名称必须以Map后缀结尾

嵌套类型中,默认以第一个字段作为聚合Key任何名称以Key、Id或Type为后缀结尾的字段,都将和第一个字段一起组成复合Key 1.5 AggregatingMergeTree

CREATE TABLE agg_table(
    id String,
    city String,
    code AggregateFunction(uniq,String),
    value AggregateFunction(sum,UInt32),
    create_time DateTime
)ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY (id,city)
PRIMARY KEY id

合并(sum加强)
ORDER BY

在写入数据时,需要调用State函数;而在查询数据时,则需要调用相应的Merge函数。其中,表示定义时使用的聚合函数

AggregatingMergeTree更为常见的应用方式是结合物化视图使用,将它作为物化视图的表引擎

CREATE MATERIALIZED VIEW agg_view
ENGINE = AggregatingMergeTree() 
PARTITION BY city
ORDER BY (id,city)
AS SELECT
    id,
    city,
    uniqState(code) AS code,
    sumState(value) AS value
FROM agg_table_basic
GROUP BY id, city

用ORBER BY排序键作为聚合数据的条件Key使用AggregateFunction字段类型定义聚合函数的类型以及聚合的字段只有在合并分区的时候才会触发聚合计算的逻辑以数据分区为单位来聚合数据

当分区合并时,同一数据分区内聚合Key相同的数据会被合并计算不同分区之间的数据则不会被计算 在聚合数据时,同一分区内,相同聚合Key的多行数据会合并成一行

于那些非主键、非AggregateFunction类型字段,则会使用第一行数据的取值 AggregateFunction类型的字段使用二进制存储,在写入数据时,需要调用State函数;而在查询数据时,则需要调用相应的Merge函数。其中,表示定义时使用的聚合函数AggregatingMergeTree通常作为物化视图的表引擎,与普通MergeTree搭配使用 1.6 CollapsingMergeTree

CREATE TABLE collpase_table(
    id String,
    code Int32,
    create_time DateTime,
    sign Int8
)ENGINE = CollapsingMergeTree(sign)
PARTITION BY toYYYYMM(create_time)
ORDER BY id

折叠
CollapsingMergeTree就是一种通过以增代删的思路,支持行级数据修改和删除的表引擎

如果sign标记为-1,则表示这行数据需要被删除。当CollapsingMergeTree分区合并时,同一数据分区内,sign标记为1和-1的一组数据会被抵消删除。

sign=1 > sign=-1

保留最后一行sign=1的数据 sign=1 < sign=-1

保留第一行sign=-1的数据 sign=1 = sign=-1

最后一行是sign=1保留第一行sign=-1最后一行sign=1的数据 最后一行是sign=-1

什么都操作

CollapsingMergeTree对于写入数据的顺序有着严格要求
如果按照正常顺序写入,先写入sign=1,再写入sign=-1,则能够正常折叠

如果数据的写入程序是单线程执行的,则能够较好地控制写入顺序;如果需要处理的数据量很大,数据的写入程序通常是多线程执行的,那么此时就不能保障数据的写入顺序了

1.7 VersionedCollapsingMergeTree
CREATE TABLE ver_collpase_table(
    id String,
    code Int32,
    create_time DateTime,
    sign Int8,
    ver UInt8
)ENGINE = VersionedCollapsingMergeTree(sign,ver)
PARTITION BY toYYYYMM(create_time)
ORDER BY id

VersionedCollapsingMergeTree对数据的写入顺序没有要求,在同一个分区内,任意顺序的数据都能够完成折叠操作

2. 外部存储引擎 2.1. HDFS

HDFS是一款分布式文件系统,堪称Hadoop生态的基石,HDFS表引擎则能够直接与它对接,读取HDFS内的文件

需要关闭HDFS的Kerberos认证(因为HDFS表引擎目前还不支持Kerberos)

HDFS表引擎通常有两种使用形式

既负责读文件,又负责写文件只负责读文件,文件写入工作则由其他外部系统完成

2.1.1. 只读
ENGINE = HDFS(hdfs_uri,format)

CREATE TABLE hdfs_table1(
    id UInt32,
    code String,
    name String
)ENGINE = HDFS('hdfs://hdp1.nauu.com:8020/clickhouse/hdfs_table1','CSV')

需要在HDFS上创建存放文件的目录需要在HDFS给ClickHouse用户授权hdfs_uri表示HDFS的文件存储路径format表示文件格式(指ClickHouse支持的文件格式,常见的有CSV、TSV和JSON等)

hadoop fs -chown -R clickhouse:clickhouse /clickhouse

目前ClickHouse并没有提供删除HDFS文件的方法,即便将数据表hdfs_table1删除,在HDFS上文件依然存在

2.1.2. 读写

这种形式类似Hive的外挂表,由其他系统直接将文件写入HDFS

通过HDFS表引擎的hdfs_uri和format参数分别与HDFS的文件路径、文件格式建立映射

hdfs_uri支持以下几种常见的配置方法

    绝对路径
      会读取指定路径的单个文件/clickhouse/hdfs_table1
    *通配符
      匹配所有字符/clickhouse/hdfs_table/*
    ?通配符
      匹配单个字符/clickhouse/hdfs_table/organization_?.csv
    {M…N}数字区间
      匹配指定数字的文件/clickhouse/hdfs_table/organization_{1…3}.csv
2.2. MySQL

MySQL表引擎可以与MySQL数据库中的数据表建立映射,并通过SQL向其发起远程查询,包括SELECt和INSERT

ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])

CREATE TABLE dolphin_scheduler_table(
    id UInt32,
    name String
)ENGINE = MySQL('10.37.129.2:3306', 'escheduler', 't_escheduler_process_definition', 'root', '')

host:port

MySQL的地址和端口 database

数据库的名称 table

需要映射的表名称 user

MySQL的用户名 password

MySQL的密码 replace_query

默认为0对应MySQL的REPLACE INTO语法如果将它设置为1,则会用REPLACE INTO代替INSERT INTO on_duplicate_clause

默认为0对应MySQL的ON DUPLICATE KEY语法如果需要使用该设置,则必须将replace_query设置成0

MySQL表引擎不支持任何UPDATE和DELETE操作

2.3. JDBC

相对MySQL表引擎而言,JDBC表引擎不仅可以对接MySQL数据库,还能够与PostgreSQL、SQLite和H2数据库对接

2.3.1. 代理

JDBC表引擎无法单独完成所有的工作,它需要依赖名为clickhouse-jdbc-bridge的查询代理服务

在使用JDBC表引擎之前,首先需要启动clickhouse-jdbc-bridge代理服务

java -jar ./clickhouse-jdbc-bridge-1.0.jar --driver-path /chbase/jdbc-bridge  --listen-host ch5.nauu.com

–driver-path

指定放置数据库驱动的目录例如要代理查询PostgreSQL数据库,则需要将它的驱动jar放置到这个目录 –listen-host

用于代理服务的监听端口,通过这个地址访问代理服务,ClickHouse的jdbc_bridge配置项与此参数对应 2.3.2. 配置

理服务配置好。需要在config.xml全局配置中增加代理服务的访问地址

……
    
        ch5.nauu.com
        9019
    

CREATE TABLE t_ds_process_definition (
    id Int32,
    name String
)ENGINE = JDBC('jdbc:postgresql://ip:5432/dolphinscheduler?user=test&password=test, '', 't_ds_process_definition')
2.4. Kafka

Kafka表引擎能够直接与Kafka系统对接,进而订阅Kafka中的主题并实时接收消息数据

在消息系统中存在三层语义,

    最多一次(At most once)
      可能出现丢失数据的情况,因为在这种情形下,一行数据在消费端最多只会被接收一次
    最少一次(At least once)
      可能出现重复数据的情况,因为在这种情形下,一行数据在消费端允许被接收多次
    恰好一次(Exactly once)
      数据不多也不少

目前ClickHouse还不支持恰好一次(Exactly once)的语义

ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port,... ',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol']
    [kafka_schema = '']
    [kafka_num_consumers = N]
    [kafka_skip_broken_messages = N]
    [kafka_commit_every_batch = N]

kafka_broker_list

Broker服务的地址列表,多个地址之间使用逗号分隔 kafka_topic_list

订阅消息主题的名称列表,多个主题之间使用逗号分隔 kafka_group_name

表示消费组的名称,表引擎会依据此名称创建Kafka的消费组 kafka_format

用于解析消息的数据格式在消息的发送端,必须按照此格式发送消息。数据格式必须是ClickHouse提供的格式之一

TSVJSONEachRowCSV kafka_row_delimiter

判定一行数据的结束符,默认值为’’ kafka_schema

对应Kafka的schema参数 kafka_num_consumers

表示消费者的数量,默认值为1表引擎会依据此参数在消费组中开启相应数量的消费者线程 kafka_skip_broken_messages

当表引擎按照预定格式解析数据出现错误时,允许跳过失败的数据行数,默认值为0,即不允许任何格式错误的情形发生

在此种情形下,只要Kafka主题中存在无法解析的数据,数据表都将不会接收任何数据 kafka_commit_every_batch

表示执行Kafka commit的频率,默认值为0,即当一整个Block数据块完全写入数据表后才执行Kafka commit如果将其设置为1,则每写完一个Batch批次的数据就会执行一次Kafka commit(一次Block写入操作,由多次Batch写入操作组成) 更多参数

Kafka表引擎底层负责与Kafka通信的部分,是基于librdkafka实现的,这是一个由C++实现的Kafka库

librdkafka的原生参数使用了点连接符,在ClickHouse中需要将其改为下划线的形式

在默认情况下,Kafka表引擎每间隔500毫秒会拉取一次数据,时间由stream_poll_timeout_ms参数控制(默认500毫秒)

数据首先会被放入缓存,在时机成熟的时候,缓存数据会被刷新到数据表

触发Kafka表引擎刷新缓存的条件有两个,当满足其中的任意一个时,便会触发刷新动作:

    当一个数据块完成写入的时候
      一个数据块的大小由kafka_max_block_size参数控制,默认情况下
      kafka_max_block_size=max_block_size=65536
    等待间隔超过7500毫秒
      由stream_flush_interval_ms参数控制(默认7500 ms)
2.4.1. 正确方式

Kafka表引擎在执行查询之后就会删除表内的数据

CREATE TABLE kafka_queue(
        id UInt32,
        code String,
        name String
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'hdp1.nauu.com:6667',
    kafka_topic_list = 'sales-queue',
    kafka_group_name = 'chgroup',
    kafka_format = 'JSONEachRow',
    kafka_skip_broken_messages = 100
CREATE TABLE kafka_table (
    id UInt32,
    code String,
    name String
) ENGINE = MergeTree()
ORDER BY id
CREATE MATERIALIZED VIEW consumer TO kafka_table
AS SELECT id,code,name FROM kafka_queue
    首先是Kafka数据表A,它充当的角色是一条数据管道,负责拉取Kafka中的数据。另外一张任意引擎的数据表B,它充当的角色是面向终端用户的查询表
      在生产环境中通常是MergeTree系列
    最后,是一张物化视图C,它负责将表A的数据实时同步到表B
2.5. File 3. 内存类型引擎 3.1. Memory

Memory表引擎直接将数据保存在内存中,数据既不会被压缩也不会被格式转换

当ClickHouse服务重启的时候,Memory表内的数据会全部丢失

所以在一些场合,会将Memory作为测试表使用

CREATE TABLE memory_1 (
    id UInt64
)ENGINE = Memory()
3.2. Set(持久)

Set表引擎是拥有物理存储的,数据首先会被写至内存,然后被同步到磁盘文件中

当服务重启时,它的数据不会丢失,当数据表被重新装载时,文件数据会再次被全量加载至内存

在Set数据结构中,所有元素都是唯一的

Set表引擎具有去重的能力,在数据写入的过程中,重复的数据会被自动忽略

支持正常的INSERT写入

不能直接使用SELECt对其进行查询,Set表引擎只能间接作为IN查询的右侧条件被查询使用。

SELECT * FROM set_1
DB::Exception: Method read is not supported by storage Set.
CREATE TABLE set_1 (
    id UInt8
)ENGINE = Set()

Set表引擎的存储结构由两部分组成

    [num].bin数据文件
      保存了所有列字段的数据num是一个自增id,从1开始每一批数据的写入(每一次INSERT),会生成一个新的.bin文件,num也会随之加1
    tmp临时目录
      数据文件首先会被写到这个目录,当一批数据写入完毕之后,数据文件会被移出此目录
3.3. Join

同Set

Join表引擎有着更加广泛的应用场景,它既能够作为JOIN查询的连接表,也能够被直接查询使用

ENGINE = Join(join_strictness, join_type, key1[, key2, ...])

CREATE TABLE id_join_tb1(
        id UInt8,
        price UInt32,
    time Datetime
) ENGINE = Join(ANY, LEFT, id)

join_strictness

连接精度,它决定了JOIN查询在连接数据时所使用的策略ALL、ANY、ASOF

当join_type被设置为ANY时,在数据写入时,join_key重复的数据会被自动忽略。 join_type

连接类型,它决定了JOIN查询组合左右两个数据集合的策略,

交集并集笛卡儿积 目前支持INNER、OUTER和CROSS三种类型 join_key 3.4. Buffer 4. 日志类型引擎 4.1. TinyLog

性能差

每一个列字段都拥有一个与之对应的.bin文件
但是TinyLog既不支持分区,也没有.mrk标记文件

由于没有标记文件,它自然无法支持.bin文件的并行读取操作,所以它只适合在非常简单的场景下使用

4.2. StripeLog

相比TinyLog而言,StripeLog拥有更高的查询性能
拥有.mrk标记文件,支持并行查询

同时其使用了更少的文件描述符(所有数据使用同一个文件保存)

data.bin

数据文件,所有的列字段使用同一个文件保存,它们的数据都会被写入data.bin index.mrk

数据标记,保存了数据在data.bin文件中的位置信息利用数据标记能够使用多个线程,以并行的方式读取data.bin内的压缩数据块,从而提升数据查询的性能。 sizes.json

元数据文件,记录了data.bin和index.mrk大小的信息

4.3. Log

Log表引擎结合了TinyLog表引擎和StripeLog表引擎的长处

5. 接口类型引擎 5.1. Merge

Merge表引擎如同一层使用了门面模式的代理,它本身不存储任何数据,也不支持数据写入

负责合并多个查询的结果集

Merge表引擎可以代理查询任意数量的数据表,这些查询会异步且并行执行,并最终合成一个结果集返回

ENGINE = Merge(database, table_name)

CREATE TABLE test_table_all as test_table_2018 
ENGINE = Merge(currentDatabase(), '^test_table_')

database

数据库名称 table_name

数据表的名称支持使用正则表达式 5.2. Dictionary

Dictionary表引擎是数据字典的一层代理封装,它可以取代字典函数,让用户通过数据表查询字典

字典内的数据被加载后,会全部保存到内存中,所以使用Dictionary表对字典性能不会有任何影响

CREATE TABLE tb_test_flat_dict (
    id UInt64, 
    code String,
    name String
)Engine = Dictionary(test_flat_dict);
5.3. Distributed 6. 其它类型引擎(了解) 6.1. Live View 6.2. Null

如果用户向Null表写入数据,系统会正确返回,但是Null表会自动忽略数据,永远不会将它们保存。

如果用户向Null表发起查询,那么它将返回一张空表

6.3. URL

URL表引擎的作用等价于HTTP客户端,


它可以通过HTTP/HTTPS协议,直接访问远端的REST服务。


当执行SELECT查询的时候,底层会将其转换为GET请求的远程调用。


而执行INSERT查询的时候,会将其转换为POST请求的远程调用

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758372.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号