生生不息,“折腾”不止;Java晋升指北,让天下没有难学的技术;视频教程资源共享,学习不难,坚持不难,坚持学习很难; >>>>
一、副本
副本
防止数据丢失,增加数据存储的冗余
1.1 数据副本分片
实现数据的水平切分
使用了 Replicated复制表系列引擎,才能应用副本的能力,同时增加了分布式协同的能力
在MergeeTree系列中,一个数据分区由开始创建到全部完成,会经历两类存储区域
内存:数据首先会被写入内存缓冲区本地缓存:数据接着会被写入tmp临时目录分区,待全部完成后再将临时目录重命名为正式分区
ReplicatedMergeTree 还会增加Zookeeper部分,会在Zookeeper内创建一系列的监听节点,并以此实现多个实例之间的通信;
在整个通信过程中,Zookeeper并不会涉及表数据的传输
ENGINE = ReplicatedMergeTree('zk_path', 'replica_name')
zk_path: 用于指定在zk中创建的数据表的路径(自定义),一般使用约定俗成的模板/clickhouse/tables/{shard}/table_name
replica_name:定义在ZooKeeper中创建的副本名称,该名称是区分不同副本实例的唯一标识。一种约定成俗的命名方式是使用所在服务器的域名称
//1分片,1副本. zk_path相同,replica_name不同
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch5.nauu.com')
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch6.nauu.com')
//分片1
//2分片,1副本. zk_path相同,其中{shard}=01, replica_name不同
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch5.nauu.com')
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch6.nauu.com')
//分片2
//2分片,1副本. zk_path相同,其中{shard}=02, replica_name不同
ReplicatedMergeTree('/clickhouse/tables/02/test_1, 'ch7.nauu.com')
ReplicatedMergeTree('/clickhouse/tables/02/test_1, 'ch8.nauu.com')
1.2 副本特点
1.3 Zk配置依赖Zookeeper
执行insert、alter查询时,需借助zk的分布式协同能力,以实现多个副本之间的同步
表级别的副本副本是表级别的定义每张表的副本配置都可以按照表的实际需求进行个性化定义
多主架构在任意一个副本执行insert、alter查询,效果相同
Block数据块执行写入命令时,会根据max_insert_block_size的大小,将数据切分成若干个Block数据块Block数据块是数据写入的基本单元,并且具备写入的原子性和唯一性
ClickHouse使用一组Zookeeper标签定义的相关配置,默认情况下,在全局的config.xml中定义即可。但是各个副本所使用的Zookeeper配置通常是相同的,为了便于在多个节点之间复制配置文件,更常见的做法是将这一部分配置抽离出来,独立使用一个文件保存
hdp1.nauu.com 2181
/etc/clickhouse-server/config.d/metrika.xml
# ClickHouse在它的系统表中,颇为贴心地提供了一张名为zookeeper的代理表 SELECt * FROM system.zookeeper where path = '/' ┌─name─────────────┬─value─┬─czxid─┐ │ dolphinscheduler │ │ 2627 │ │ clickhouse │ │ 92875 │ └──────────────────┴───────┴───────┘1.4 原理解析
1.4.1. 数据结构在ReplicatedMergeTree的核心逻辑中,大量运用了ZooKeeper的能力,以实现多个ReplicatedMergeTree副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和BlockID去重判断等。
在执行INSERT数据写入、MERGE分区和MUTATION操作的时候,都会涉及与ZooKeeper的通信。
但是在通信的过程中,并不会涉及任何表数据的传输,在查询数据的时候也不会访问ZooKeeper,所以不必过于担心ZooKeeper的承载压力
| 数据结构 | 说明 |
|---|---|
| ./metadata | 保存元数据信息,包括主键、分区键、采样表达式等 |
| ./columns | 保存列字段信息,包括列名称和数据类型 |
| ./replicas | 保存副本名称,对应设置参数中的replica_name |
| ·/leader_election | 用于主副本的选举工作,主副本会主导MERGE和MUTATION操作(ALTER DELETE和ALTER UPDATE)。这些任务在主副本完成之后再借助ZooKeeper将消息事件分发至其他副本 |
| ·/blocks | 记录Block数据块的Hash信息摘要,以及对应的partition_id。通过Hash摘要能够判断Block数据块是否重复;通过partition_id,则能够找到需要同步的数据分区 |
| ·/block_numbers | 按照分区的写入顺序,以相同的顺序记录partition_id。各个副本在本地进行MERGE时,都会依照相同的block_numbers顺序进行 |
| ·/quorum | 记录quorum的数量,当至少有quorum数量的副本写入成功后,整个写操作才算成功 |
| ·/log | 常规操作日志节点(INSERT、MERGE和DROP PARTITION),它是整个工作机制中最为重要的一环,保存了副本需要执行的任务指令 |
| ·/mutations | MUTATION操作日志节点,作用与log日志类似,当执行ALER DELETE和ALER UPDATE查询时,操作指令会被添加到这个节点 |
| ./replicas/{replica_name}/* | 每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令 |
1.4.2.1 Insert副本协同的核心流程:
INSERT:写入
MERGE:分区合并
MUTATION:数据修改
ALTER:元数据修改
INSERT和ALTER查询是分布式执行的
其他查询并不支持分布式执行,包括SELECT、CREATE、DROP、RENAME和ATTACH
借助ZooKeeper的事件通知机制,多个副本之间会自动进行有效协同,但是它们不会使用ZooKeeper存储任何分区数据
- 创建第一个副本实例
CREATE TABLE replicated_sales_1(
id String,
price Float64,
create_time DateTime
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch5.nauu.com')
PARTITION BY toYYYYMM(create_time)
ORDER BY id
在创建过程中,ReplicatedMergeTree会进行一些初始化操作
根据zk_path初始化所有的ZooKeeper节点在/replicas/节点下注册自己的副本实例ch5.nauu.com启动监听任务,监听/log日志节点参与副本选举,选举出主副本,选举的方式是向/leader_election/插入子节点,第一个插入成功的副本就是主副本
- 创建第二个副本实例
CREATE TABLE replicated_sales_1(
//相同结构
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1','ch6.nauu.com')
在/replicas/节点下注册自己的副本实例ch6.nauu.com启动监听任务,监听/log日志节点参与副本选举,选举出主副本。在这个例子中,CH5副本成为主副本
- 向第一个副本实例写入数据
INSERT INTO TABLE replicated_sales_1 VALUES('A001',100,'2019-05-10 00:00:00')
上述命令执行之后, 首先会在本地完成分区目录的写入:Renaming temporary part tmp_insert_201905_1_1_0 to 201905_0_0_0接着向/blocks节点写入该数据分区的block_id:Wrote block with ID ‘201905_2955817577822961065_12656761735954722499’(该block_id将作为后续去重操作的判断依据:Block with ID 201905_2955817577822961065_12656761735954722499 already exists; ignoring it.)此外,如果设置了insert_quorum参数(默认为0),并且insert_quorum>=2,则CH5会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于insert_quorum时,整个写入操作才算成功
由第一个副本实例推送Log日志(谁写谁负责)
- 在3步骤完成之后,会继续由执行了INSERT的副本向/log节点推送操作日志(CH5)日志的编号是/log/log-0000000000
第二个副本实例拉取Log日志
- CH6副本会一直监听/log节点变化,当CH5推送了/log/log-0000000000之后,CH6便会触发日志的拉取任务并更新log_pointer将其指向最新日志下标:/replicas/ch6.nauu.com/log_pointer : 0(这是因为在复杂的情况下,考虑到在同一时段内,会连续收到许多个LogEntry,所以使用队列的形式消化任务是一种更为合理的设)
第二个副本实例向其他副本发起下载请求
- CH6基于/queue队列开始执行任务。当看到type类型为get的时候,ReplicatedMerge-Tree会明白此时在远端的其他副本中已经成功写入了数据分区,而自己需要同步这些数据
- 从/replicas节点拿到所有的副本节点遍历这些副本,选取其中一个(取的副本需要拥有最大的log_pointer下标,并且/queue子节点数量最少)
第一个副本实例响应数据下载
第二个副本实例下载数据并完成本地写入
1.4.2.2 Merge在INSERT的写入过程中,ZooKeeper不会进行任何实质性的数据传输
谁执行谁负责
首先在本地写入了分区数据。之后,也由这个副本负责发送Log日志,通知其他副本下载数据
无论MERGE操作从哪个副本发起,其合并计划都会交由主副本来制定
在MERGE的合并过程中,ZooKeeper也不会进行任何实质性的数据传输,所有的合并操作,最终都是由各个副本在本地完成的。而无论合并动作在哪个副本被触发,都会首先被转交至主副本,再由主副本负责合并计划的制定、消息日志的推送以及对日志接收情况的监控
- 创建远程连接,尝试与主副本通信
- 即使是在CH6进行强制合并,CH6也会通过/replicate找到主副本CH5,并尝试建立与它的远程连接
optimize table replicated_sales_1 Connection (ch5.nauu.com:9000): Connecting. Database: default. User: default
- 主副本接收通信(主副本CH5接收并建立来自远端副本CH6的连接)
Connected ClickHouse Follower replica version 19.17.0, revision: 54428, database: default, user: default
- 由主副本制定MERGE计划并推送Log日志
- 由主副本CH5制定MERGE计划,并判断哪些分区需要被合并。在选定之后,CH5将合并计划转换为Log日志对象并推送Log日志,以通知所有副本开始合并
log/log-0000000002 source replica: ch5.nauu.com block_id: type : merge # 从日志内容中可以看出,操作类型为Merge合并 201905_0_0_0 # 合并的分区目录 201905_1_1_0 # 合并的分区目录 into 201905_0_1_1 # 与此同时,主副本还会锁住执行线程,对日志的接收情况进行监听 # 其监听行为由replication_alter_partitions_sync参数控制,默认值为1。当此参数为0时,不做任何等待;为1时,只等待主副本自身完成;为2时,会等待所有副本拉取完成
- 各个副本分别拉取Log日志
- CH5和CH6两个副本实例将分别监听/log/log-0000000002日志的推送,它们也会分别拉取日志到本地,并推送到各自的/queue任务队列
无论MUTATION操作从哪个副本发起,首先都会由主副本进行响应
- 推送MUTATION日志
- 即使在CH6进行mutation操作,也会找到主副本CH5 ALTER TABLE replicated_sales_1 DELETE WHERe id = '1'执行之后,该副本会接着进行两个重要事项
- 创建MUTATION ID将MUTATION操作转换为MutationEntry日志,并推送到/mutations/0000000000(MUTATION的操作日志是经由/mutations节点分发至各个副本)
- 当监听到有新的MUTATION日志加入时,并不是所有副本都会直接做出响应,它们首先会判断自己是否为主副本
- 只有主副本才会响应MUTATION日志,在这个例子中主副本为CH5,所以CH5将MUTATION日志转换为LogEntry日志并推送至/log节点,以通知各个副本执行具体的操作
- CH5和CH6两个副本分别监听/log/log-0000000003日志的推送,它们也会分别拉取日志到本地,并推送到各自的/queue任务队列
- CH5和CH6基于各自的/queue队列开始执行任务
当对ReplicatedMergeTree执行ALTER操作进行元数据修改的时候,即会进入ALTER部分的逻辑,例如增加、删除表字段等
本着谁执行谁负责的原则,在这个案例中由CH6负责对共享元数据的修改以及对各个副本修改进度的监控
- 修改共享元数据
- 执行之后,CH6会修改ZooKeeper内的共享元数据节点ALTER TABLE replicated_sales_1 ADD COLUMN id2 String数据修改后,节点的版本号也会同时提升与此同时,CH6还会负责监听所有副本的修改完成情况
- CH5和CH6两个副本分别监听共享元数据的变更之后,它们会分别对本地的元数据版本号与共享版本号进行对比
- CH6确认所有副本均已完成修改
2.1. 集群配置ClickHouse中的每个服务节点都可称为一个shard(分片)
ClickHouse的数据分片需要结合Distributed表引擎一同使用
Distributed表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。
在ClickHouse中,集群配置用shard代表分片、用replica代表副本。
由于Distributed表引擎需要读取集群的信息,所以首先必须为ClickHouse添加集群的配置
找到前面在介绍ZooKeeper配置时增加的metrika.xml配置文件,将其加入集群的配置信息
…… ch5.nauu.com 9000 ch6.nauu.com 9000
2.2. 分布式DDL--------------------------------------------------------------------------------- ch5.nauu.com 9000 ch6.nauu.com 9000 --------------------------------------------------------------------------------- ch5.nauu.com 9000 ch6.nauu.com 9000 ch5.nauu.com 9000 ch6.nauu.com 9000 ch7.nauu.com 9000 ch8.nauu.com 9000
在前面介绍数据副本时为了创建多张副本表,我们需要分别登录到每个ClickHouse节点,在它们本地执行各自的CREATE语句。
这是因为在默认的情况下,CREATE、DROP、RENAME和ALTER等DDL语句并不支持分布式执行
在加入集群配置后,就可以使用新的语法实现分布式DDL执行
CREATE/DROP/RENAME/ALTER TABLE ON CLUSTER cluster_name其中,cluster_name对应了配置文件中的集群名称,ClickHouse会根据集群的配置信息顺藤摸瓜,分别去各个节点执行DDL语句
# 1分片,2副本. zk_path相同,replica_name不同。
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch5.nauu.com')
ReplicatedMergeTree('/clickhouse/tables/01/test_1, 'ch6.nauu.com')
分布式DDL的形式
CREATE TABLE test_1_local ON CLUSTER shard_2(
id UInt64
--这里可以使用任意其他表引擎,
)ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test_1', '{replica}')
ORDER BY id
# 在执行了上述语句之后,ClickHouse会根据集群shard_2的配置信息,分别在CH5和CH6节点本地创建test_1_local
# 值得注意的是,在改写的CREATE语句中,用{shard}和{replica}两个动态宏变量代替了先前的硬编码方式
# 这些宏变量是通过配置文件的形式预先定义在各个节点的配置文件中的
这些宏变量是通过配置文件的形式预先定义在各个节点的配置文件中的01 ch5.nauu.com 02 ch6.nauu.com
分布式DDL语句在执行的过程中也需要借助ZooKeeper的协同能力,以实现日志分发 在默认情况下,分布式DDL在ZooKeeper内使用的根路径为 /clickhouse/task_queue/ddl 该路径由config.xml内的distributed_ddl配置指定2.2.1 DDL原理在此根路径之下,还有一些其他的监听节点,其中包括/query-[seq],其是DDL操作日志,每执行一次分布式DDL查询,在该节点下就会新增一条操作日志,以记录相应的操作指令 当各个节点监听到有新日志加入的时候,便会响应执行 DDL操作日志使用ZooKeeper的持久顺序型节点,每条指令的名称以query-为前缀,后面的序号递增,例如query-0000000000、query-0000000001等 在每条query-[seq]操作日志之下,还有两个状态节点 (1)/query-[seq]/active:用于状态监控等用途,在任务的执行过程中,在该节点下会临时保存当前集群内状态为active的节点。 (2)/query-[seq]/finished:用于检查任务完成情况,在任务的执行过程中,每当集群内的某个host节点执行完毕之后,便会在该节点下写入记录。 /query-000000001/finished ch5.nauu.com:9000 : 0 ch6.nauu.com:9000 : 0 DDLLogEntry日志对象的数据结构(在/query-[seq]下记录的日志信息由DDLLogEntry承载) (1)query记录了DDL查询的执行语句 query: DROP TABLE default.test_1_local ON CLUSTER shard_2 (2)hosts记录了指定集群的hosts主机列表,集群由分布式DDL语句中的ON CLUSTER指定 hosts: ['ch5.nauu.com:9000','ch6.nauu.com:9000'] (3)initiator记录初始化host主机的名称,hosts主机列表的取值来自于初始化host节点上的集群 /clickhouse/task_queue/ddl
- 推送DDL日志
- 首先在CH5节点执行CREATE TABLE ON CLUSTER谁执行谁负责 CH5节点负责创建DDLLogEntry日志并将日志推送到ZooKeeper,同时也会由这个节点负责监控任务的执行进度
- CH5和CH6两个节点分别监听/ddl/query-0000000064日志的推送,于是它们分别拉取日志到本地它们会判断各自的host是否被包含在DDLLog-Entry的hosts列表中如果包含在内,则进入执行流程,执行完毕后将状态写入finished节点如果不包含,则忽略这次日志的推送
- 执行DDL语句之后,客户端会阻塞等待180秒,以期望所有host执行完毕如果等待时间大于180秒,则会转入后台线程继续等待(等待时间由distributed_ddl_task_timeout参数指定,默认为180秒)
Distributed表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作
2.3.1 使用定义本地表
通常以_local为后缀进行命名本地表是承接数据的载体,可以使用非Distributed的任意表引擎一张本地表对应了一个数据分片
分布式表通常以_all为后缀进行命名分布式表只能使用Distributed表引擎它与本地表形成一对多的映射关系
对于分布式表与本地表之间表结构的一致性检查,Distributed表引擎采用了读时检查的机制,这意味着如果它们的表结构不兼容,只有在查询时才会抛出错误,而在创建表时并不会进行检查
ENGINE = Distributed(cluster, database, table [,sharding_key])
cluster:集群名称,与集群配置中的自定义名称相对应
database和table:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表
sharding_key:分片键
CREATE TABLE test_shard_2_all ON CLUSTER sharding_simple (
id UInt64
)ENGINE = Distributed(sharding_simple, default, test_shard_2_local,rand())
CREATE TABLE test_shard_2_local ON CLUSTER sharding_simple (
id UInt64
)ENGINE = MergeTree()
ORDER BY id
PARTITION BY id
2.3.2 查询分类
2.3.3 分片规则会作用于本地表的查询
对于INSERT和SELECT查询,Distributed将会以分布式的方式作用于local本地表
只会影响Distributed自身,不会作用于本地表的查询Distributed支持部分元数据操作,包括CREATE、DROP、RENAME和ALTER,其中ALTER并不包括分区的操作(ATTACH PARTITION、REPLACE PARTITION等)这些查询只会修改Distributed表自身,并不会修改local本地表
不支持的查询Distributed表不支持任何MUTATION类型的操作,包括ALTER DELETE和ALTER UPDATE
分片键要求返回一个整型类型的取值,包括Int系列和UInt系列
-- 按照用户id的余数划分 Distributed(cluster, database, table ,userid) -- 按照随机数划分 Distributed(cluster, database, table ,rand()) -- 按照用户id的散列值划分 Distributed(cluster, database, table , intHash64(userid))
2.3.4 写入流程分片权重(weight)slot(槽)
1. slot可以理解成许多小的水槽,如果把数据比作是水的话,那么数据之水会顺着这些水槽流进每个数据分片
2. 如果slot值落在[0,10)区间,则对应第一个分片;如果slot值落在[10,20]区间,则对应第二个分选择函数它会找出slot的取值,其计算公式:slot = shard_value % sum_weight基于slot值找到对应的数据分片
在向集群内的分片写入数据时,通常有两种思路:
借助外部计算系统,事先将数据均匀分片,再借由计算系统直接将数据写入ClickHouse集群的各个本地表通过Distributed表引擎代理写入分片数据的
2.3.4.1 Distribured方式这种方案通常拥有更好的写入性能,因为分片数据是被并行点对点写入的
但是这种方案的实现主要依赖于外部系统,而不在于ClickHouse自身
- 在第一个分片节点写入本地分片数据
- 首先在CH5节点,对分布式表test_shard_2_all执行INSERT查询,尝试写入10、30、200和55四行数据执行之后分布式表主要会做两件事情
- 根据分片规则划分数据将属于当前分片的数据直接写入本地表test_shard_2_local
- 将归至远端分片的数据以分区为单位,分别写入test_shard_2_all存储目录下的临时bin文件;database@host:port/[increase_num].bin
- 此时,会有另一组监听任务负责监听/test_shard_2_all目录下的文件变化,这些任务负责将目录数据发送至远端分片其中,每份目录将会由独立的线程负责发送,数据在传输之前会被压缩
- CH6分片节点确认建立与CH5的连接在接收到来自CH5发送的数据后,将它们写入本地表
- 由CH5分片确认所有的数据发送完毕
Distributed表负责所有分片的写入工作
谁执行谁负责的原则
在这个示例中,由CH5节点的分布式表负责切分数据,并向所有其他分片节点发送数据
Distributed表负责向远端分片发送数据时,有异步写和同步写两种模式
异步
在Distributed表写完本地分片之后,INSERT查询就会返回成功写入的信息
同步在执行INSERT查询之后,会等待所有分片完成写入
模式由insert_distributed_sync参数控制,默认为false,即异步写
如果在集群的配置中包含了副本,那么除了刚才的分片写入流程之外,还会触发副本数据的复制流程。
数据在多个副本之间,有两种复制实现方式:
借助Distributed表引擎,由它将数据写入副本;借助ReplicatedMergeTree表引擎实现副本数据的分发
如果在集群的shard配置中增加internal_replication参数并将其设置为true(默认为false),那么Distributed表在该shard中只会选择一个合适的replica并对其写入数据



