创建Topic时,需要指定partitions和replication-factor,而最重要的是尽量均匀的把每个replica分发到Brokers集群中。
即创建Topic第一步要生成TopicPartitionAssignment,并写zookeeper中,第二个根据KafkaController中的监听执行生成流程。
kafka-topics.sh提供了两种方式:
- 指定replica-assignment, 即完全人为分配,指定某个replica分配到某个broker上面
- 由Kafka默认策略分配
默认分配原则如下:1. Spread the replicas evenly among brokers. 尽量均匀的分配replica到每个Broker上 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers. 对于Partition的某个Replica已经分配到某个Broker上面,那么其他的Replicas需要尽量的分配到其他的Broker上面 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible 对于Brokers配置了Rack感知信息,Replicas还需要尽可能的分配到不同的Rack上面
1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
随机(round-robin)方式分配第一个Replica. 随机是为了避免broker排序靠前的负载很高
对于自动分配,fixedStartIndex = -1
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
随机生成第一个Replica的位置
2. Assign the remaining replicas of each partition with an increasing shift.
随机生成一个偏移量,以这个偏移量(每轮都需要递增)分配其他的Replicas。每一轮都递增shift是为了避免当partition数量大于broker时,会导致只有固定间隔的Broker被分配到Replica
对于自动分配,fixedStartIndex = -1
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
随机生成固定的偏移量
case1: brokers = 3 (1,2,3), partitions = 2, replica = 1, startIndex = 2, nextReplicaShift = 2
p0r0 => curPosition=0, startIndex=2 ((0 + 2) % 3) = 2 => brokers[2] = 3
p1r0 => curPosition=1, startIndex=2 ((1 + 2) % 3) = 0 => brokers[0] = 1
case2: brokers = 3 (1,2,3), partitions = 2, replica = 2, startIndex = 2, nextReplicaShift = 2
p0r0 => curPosition=0, startIndex=2 => ((0 + 2) % 3) = 2 => brokers[2] = 3
p0r1 => firstReplicaIndex=2, nextReplicaShift=2 => (2+(2+0)%(3-1)+1)%3 => brokers[0] = 1
p1r0 => curPosition=1, startIndex=2 ((1 + 2) % 3) = 0 => brokers[0] = 1
p1r1 => firstReplicaIndex=0, nextReplicaShift=2+1 (0+(3+1)%(3-1)+1)%3 => brokers[1] = 2
考虑Rack(AdminUtils.assignReplicasToBrokersRackAware)


