Kafka分区

博客 动态
0 188
优雅殿下
优雅殿下 2022-05-30 21:59:52
悬赏:0 积分 收藏

Kafka 分区

一、副本机制

Kafka在?定数量的服务器上对主题分区进?复制。

当集群中的?个broker宕机后系统可以?动故障转移到其他可?的副本上,不会造成数据丢失。

创建主题:

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_demo_02 --partitions 2 --replication-factor 3

上面创建主题中的 --replication-factor 3 表示有3个副本,1个Leader + 2个 Follower

  1. 将复制因?为1的未复制主题称为复制主题。
  2. 主题的分区是复制的最?单元。
  3. 在?故障情况下,Kafka中的每个分区都有?个Leader副本和零个或多个Follower副本。
  4. 包括Leader副本在内的副本总数构成复制因?。
  5. 所有读取和写?都由Leader副本负责。
  6. 通常,分区?broker多,并且Leader分区在broker之间平均分配

Follower分区像普通的Kafka消费者?样,消费来?Leader分区的消息,并将其持久化到??的?志中

允许Follower对?志条?拉取进?批处理

同步节点定义:

  1. 节点必须能够维持与ZooKeeper的会话(通过ZooKeeper的?跳机制)
  2. 对于Follower副本分区,它复制在Leader分区上的写?,并且不要延迟太多

Kafka提供的保证是,只要有?少?个同步副本处于活动状态,提交的消息就不会丢失。

宕机如何恢复

  1. 少部分副本宕机

    当leader宕机了,会从follower选择?个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader?pull数据。

  2. 全部副本宕机

    当全部副本宕机了有两种恢复?式

    • 等待ISR中的?个恢复后,并选它作为leader。(等待时间较?,降低可?性)
    • 选择第?个恢复的副本作为新的leader,?论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)

二、Leader 选举

先看一张图片,在这张图片中:

  • 分区P1的Leader是0,ISR是0和1
  • 分区P2的Leader是2,ISR是1和2
  • 分区P3的Leader是1,ISR是0,1,2。

?产者和消费者的请求都由Leader副本来处理。Follower副本只负责消费Leader副本的数据和Leader保持同步。

对于P1,如果0宕机会发?什么?

Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发?故障的时候,就需要进?分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。

如何选举

如果某个分区所在的服务器除了问题,不可?,kafka会从该分区的其他的副本中选择?个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。

只有那些跟Leader保持同步的Follower才应该被选作新的Leader。

Kafka会在Zookeeper上针对每个Topic维护?个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是?些分区的副本。

只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的?产者。

如果这个集合有增减,kafka会更新zookeeper上的记录。

如果某个分区的Leader不可?,Kafka就会从ISR集合中选择?个副本作为新的Leader。

显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数?较?。

假设某个topic有N+1个副本,kafka可以容忍N个服务器不可?。

为什么不?少数服从多数的?法

少数服从多数是?种?较常?的?致性算发和Leader选举法。

它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;

选择Leader时也是从超过半数的同步的副本中选择。

这种算法需要较?的冗余度,跟Kafka?起来,浪费资源。

譬如只允许?台机器失败,需要有三个副本;?如果只容忍两台机器失败,则需要五个副本。

?kafka的ISR集合?法,分别只需要两个和三个副本。

如果所有的ISR副本都失败了怎么办

此时有两种?法可选:

  1. 等待ISR集合中的副本复活,
  2. 选择任何?个?即可?的副本,?这个副本不?定是在ISR集合中。
    • 需要设置 unclean.leader.election.enable=true

这两种?法各有利弊,实际?产中按需选择。

如果要等待ISR副本复活,虽然可以保证?致性,但可能需要很?时间。?如果选择?即可?的副本,则很可能该副本并不?致。

总结

Kafka中Leader分区选举,通过维护?个动态变化的ISR集合来实现,?旦Leader分区丢掉,则从ISR中随机挑选?个副本做新的Leader分区。

如果ISR中的副本都丢失了,则:

  1. 可以等待ISR中的副本任何?个恢复,接着对外提供服务,需要时间等待。
  2. 从OSR中选出?个副本做Leader副本,此时会造成数据丢失

三、分区重新分配

向已经部署好的Kafka集群??添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置?件,然后把??的broker id修改成全局唯?的,最后启动这个节点即可将它加?到现有Kafka集群中。

问题:新添加的Kafka节点并不会?动地分配数据,?法分担集群的负载,除?我们新建?个topic。

需要?动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的?具来重新分布某个topic的分区。

在重新分布topic分区之前,我们先来看看现在topic的各个分区的分布位置:

创建主题

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_01 --partitions 5 --replication-factor 1

查看主题信息

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_01Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 4 Leader: 0 Replicas: 0 Isr: 0

在node11搭建Kafka

安装 JDK、Kafka,这里不需要安装Zookeeper

修改 Kafka 的配置config/server.properties

broker.id=1zookeeper.connect=node1:2181/myKafka

启动 kafka

[root@node11 ~]# kafka-server-start.sh /usr/src/kafka_2.12-1.0.2/config/server.properties

注意观察node11上节点启动的时候的ClusterId,看和zookeeper节点上的ClusterId是否?致,如果是,证明node11和node1在同?个集群中。

node11启动的Cluster ID:

zookeeper节点上的Cluster ID:

在node1上查看zookeeper的节点信息(node11的节点已经加?集群了):

现在我们在现有集群的基础上再添加?个Kafka节点,然后使?Kafka?带的kafka-reassign-partitions.sh ?具来重新分布分区。该?具有三种使?模式:

  1. generate模式,给定需要重新分配的Topic,?动?成reassign plan(并不执?)
  2. execute模式,根据指定的reassign plan重新分配Partition
  3. verify模式,验证重新分配Partition是否成功

我们将分区3和4重新分布到broker1上,借助kafka-reassign-partitions.sh?具?成reassign plan,不过我们先得按照要求定义?个?件,??说明哪些topic需要重新分区,?件内容如下

[root@node1 ~]# cat topics-to-move.json { 	"topics": [ 		{ "topic":"tp_re_01" } 	], 	"version":1}

然后使?kafka-reassign-partitions.sh?具?成reassign plan

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --topicsto-move-json-file topics-to-move.json --broker-list "0,1" --generateCurrent partition replica assignment{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}

Proposed partition reassignment configuration下??成的就是将分区重新分布到broker 1上的结果。我们将这些内容保存到名为result.json?件??(?件名不重要,?件格式也不?定要以json为结尾,只要保证内容是json即可),然后执?这些reassign plan:

执?计划:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --executeCurrent partition replica assignment{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions.

这样Kafka就在执?reassign plan,我们可以校验reassign plan是否执?完成:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file topics-to-execute.json --verifyStatus of partition reassignment: Reassignment of partition tp_re_01-1 completed successfullyReassignment of partition tp_re_01-4 completed successfullyReassignment of partition tp_re_01-2 completed successfullyReassignment of partition tp_re_01-3 completed successfullyReassignment of partition tp_re_01-0 completed successfully

查看主题的细节:

分区的分布的确和操作之前不?样了,broker 1上已经有分区分布上去了。使? kafka-reassign-partitions.sh?具?成的reassign plan只是?个建议,?便?家?已。其实我们??完全可以编辑?个reassign plan,然后执?它,如下:

{    "version": 1,    "partitions": [{        "topic": "tp_re_01",        "partition": 4,        "replicas": [1],        "log_dirs": ["any"]     }, {         "topic": "tp_re_01",         "partition": 1,         "replicas": [0],         "log_dirs": ["any"]     }, {         "topic": "tp_re_01",         "partition": 2,         "replicas": [0],         "log_dirs": ["any"]     }, {         "topic": "tp_re_01",         "partition": 3,         "replicas": [1],         "log_dirs": ["any"]     }, {         "topic": "tp_re_01",         "partition": 0,         "replicas": [0],         "log_dirs": ["any"]     }]}        

将上?的json数据?件保存到my-topics-to-execute.json?件中,然后也是执?它:

[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file my-topics-to-execute.json --executeCurrent partition replica assignment{"version":1,"partitions":[{"topic":"tp_re_01","partition":4,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"tp_re_01","partition":3,"replicas":[1],"log_dirs":["any"]},{"topic":"tp_re_01","partition":0,"replicas":[0],"log_dirs":["any"]}]}Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions.[root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file my-topics-to-execute.json --verifyStatus of partition reassignment:Reassignment of partition tp_re_01-1 completed successfullyReassignment of partition tp_re_01-4 completed successfullyReassignment of partition tp_re_01-2 completed successfullyReassignment of partition tp_re_01-3 completed successfullyReassignment of partition tp_re_01-0 completed successfully

等这个reassign plan执?完,我们再来看看分区的分布:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_01Topic:tp_re_01 PartitionCount:5 ReplicationFactor:1 Configs:Topic: tp_re_01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 2 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_01 Partition: 3 Leader: 1 Replicas: 1 Isr: 1Topic: tp_re_01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1[root@node1 ~]#

四、自动再平衡

我们可以在新建主题的时候,?动指定主题各个Leader分区以及Follower分区的分配情况,即什么分区副本在哪个broker节点上。

随着系统的运?,broker的宕机重启,会引发Leader分区和Follower分区的??转换,最后可能Leader?部分都集中在少数?台broker上,由于Leader负责客户端的读写操作,此时集中Leader分区的少数?台服务器的?络I/O,CPU,以及内存都会很紧张。

Leader和Follower的??转换会引起Leader副本在集群中分布的不均衡,此时我们需要?种?段,让Leader的分布重新恢复到?个均衡的状态。

执?脚本:

[root@node11 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"

上述脚本执?的结果是:创建了主题tp_demo_03,有三个分区,每个分区两个副本,Leader副本在列表中第?个指定的brokerId上,Follower副本在随后指定的brokerId上。

然后模拟broker0宕机的情况:

# 通过jps找到Kafka进程PID[root@node1 ~]# jps54912 Jps1699 QuorumPeerMain1965 Kafka# 直接杀死进程[root@node1 ~]# kill -9 1965[root@node1 ~]# jps1699 QuorumPeerMain54936 Jps[root@node1 ~]## 查看主题分区信息:[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_demo_03Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs:Topic: tp_demo_03 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1Topic: tp_demo_03 Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1[root@node1 ~]## 重新启动node1上的Kafka[root@node1 ~]# kafka-server-start.sh -daemon /opt/kafka_2.12-1.0.2/config/server.properties[root@node1 ~]# jps1699 QuorumPeerMain55525 Kafka55557 Jps[root@node1 ~]## 查看主题的分区信息:[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_demo_03Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs:Topic: tp_demo_03 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0Topic: tp_demo_03 Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1,0[root@node1 ~]## broker恢复了,但是Leader的分配并没有变化,还是处于Leader切换后的分配情况。

是否有?种?式,可以让Kafka?动帮我们进?修改?改为初始的副本分配?

此时,?到了Kafka提供的?动再均衡脚本:kafka-preferred-replica-election.sh

先看介绍:

该?具会让每个分区的Leader副本分配在合适的位置,让Leader分区和Follower分区在服务器之间均衡分配。

如果该脚本仅指定zookeeper地址,则会对集群中所有的主题进?操作,?动再平衡。

具体操作:

  1. 创建preferred-replica.json,内容如下:

    {    "partitions": [        {            "topic":"tp_demo_03",            "partition":0        },        {            "topic":"tp_demo_03",            "partition":1        },        {            "topic":"tp_demo_03",            "partition":2        }    ]}
  2. 执?操作:

    [root@node1 ~]# kafka-preferred-replica-election.sh --zookeeper node1:2181/myKafka --path-to-json-file preferred-replicas.jsonCreated preferred replica election path with {"version":1,"partitions":[{"topic":"tp_demo_03","partition":0},{"topic":"tp_demo_03","partition":1},{"topic":"tp_demo_03","partition":2}]}Successfully started preferred replica election for partitions Set(tp_demo_03-0, tp_demo_03-1, tp_demo_03-2)[root@node1 ~]#
  3. 查看操作的结果

    [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_demo_03Topic:tp_demo_03 PartitionCount:3 ReplicationFactor:2 Configs:Topic: tp_demo_03 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 1,0 Topic: tp_demo_03 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0Topic: tp_demo_03 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 1,0[root@node1 ~]#

恢复到最初的分配情况。

之所以是这样的分配,是因为我们在创建主题的时候:

--replica-assignment "0:1,1:0,0:1"

在逗号分割的每个数值对中排在前?的是Leader分区,后?的是副本分区。那么所谓的preferred replica,就是排在前?的数字就是Leader副本应该在的brokerId。

五、修改分区副本

实际项?中,我们可能由于主题的副本因?设置的问题,需要重新设置副本因?

或者由于集群的扩展,需要重新设置副本因?。

topic?旦使??不能轻易删除重建,因此动态增加副本因?就成为最终的选择。

说明:kafka 1.0版本配置?件默认没有default.replication.factor=x, 因此如果创建topic时,不指定–replication-factor 想, 默认副本因?为1. 我们可以在??的server.properties中配置上常?的副本因?,省去?动调整。例如设置default.replication.factor=3, 详细内容可参考官??档https://kafka.apache.org/documentation/#replication

原因分析

假设我们有2个kafka broker分别broker0,broker1。

  1. 当我们创建的topic有2个分区partition时并且replication-factor为1,基本上?个broker上?个分区。当?个broker宕机了,该topic就?法使?了,因为两个个分区只有?个能?。

  2. 当我们创建的topic有3个分区partition时并且replication-factor为2时,可能分区数据分布情况是

    broker0, partiton0,partiton1,partiton2,

    broker1, partiton1,partiton0,partiton2,

    每个分区有?个副本,当其中?个broker宕机了,kafka集群还能完整凑出该topic的两个分区,例如当broker0宕机了,可以通过broker1组合出topic的两个分区。

  3. 创建主题

    [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_re_02 --partitions 3 --replication-factor 1
  4. 查看主题细节

    [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_02Topic:tp_re_02 PartitionCount:3 ReplicationFactor:1 Configs:Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1 Isr: 1[root@node1 ~]#
  5. 修改副本因?:错误

  6. 使?kafka-reassign-partitions.sh修改副本因?:
    创建increment-replication-factor.json

    {    "version":1,    "partitions":[        {"topic":"tp_re_02","partition":0,"replicas":[0,1]}, 	        {"topic":"tp_re_02","partition":1,"replicas":[0,1]},         {"topic":"tp_re_02","partition":2,"replicas":[1,0]}    ]}
  7. 执?分配

    [root@node1 ~]# kafka-reassign-partitions.sh --zookeeper node1:2181/myKafka --reassignment-json-file increase-replication-factor.json --executeCurrent partition replica assignment{"version":1,"partitions":[{"topic":"tp_re_02","partition":2,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"tp_re_02","partition":1,"replicas":[0,1],"log_dirs":["any","any"]},{"topic":"tp_re_02","partition":0,"replicas":[0,1],"log_dirs":["any","any"]}]}Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions.
  8. 查看主题细节

    [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --describe --topic tp_re_02Topic:tp_re_02 PartitionCount:3 ReplicationFactor:2 Configs:Topic: tp_re_02 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0Topic: tp_re_02 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1Topic: tp_re_02 Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0[root@node1 ~]#

六、分区分配策略


在Kafka中,每个Topic会包含多个分区,默认情况下?个分区只能被?个消费组下?的?个消费者消费,这?就产?了分区分配的问题。Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignor、RoundRobinAssignor、StickyAssignor。

6.1 RangeAssignor

PartitionAssignor接??于?户定义实现分区分配算法,以实现Consumer之间的分区分配。

消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的?个消费者来执?这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。Kafka默认采?RangeAssignor的分配算法

RangeAssignor对每个Topic进?独?的分区分配。对于每?个Topic,?先对分区按照分区ID进?数值排序,然后订阅这个Topic的消费组的消费者再进?字典排序,之后尽量均衡的将分区分配给消费者。这?只能是尽量均衡,因为分区数可能?法被消费者数量整除,那么有?些消费者就会多分配到?些分区。

?致算法如下:

assign(topic, consumers) {    // 对分区和Consumer进?排序    List<Partition> partitions = topic.getPartitions();     sort(partitions);     sort(consumers);    // 计算每个Consumer分配的分区数    int numPartitionsPerConsumer = partition.size() / consumers.size();    // 额外有?些Consumer会多分配到分区    int consumersWithExtraPartition = partition.size() % consumers.size();    // 计算分配结果    for (int i = 0, n = consumers.size(); i < n; i++) {        // 第i个Consumer分配到的分区的index        int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);        // 第i个Consumer分配到的分区数        int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);        // 分装分配结果        assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));    }}

RangeAssignor策略的原理是按照消费者总数和分区总数进?整除运算来获得?个跨度,然后将分区按照跨度进?平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每?个Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配?个分区。

这种分配?式明显的?个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,?如上图中4个分区3个消费者的场景,C0会多分配?个分区。如果此时再订阅?个分区数为4的Topic,那么C0?会?C1、C2多分配?个分区,这样C0总共就?C1、C2多分配两个分区了,?且随着Topic的增加,这个情况会越来越严重。

字典序靠前的消费组中的消费者?较“贪婪”。

6.2 RoundRobinAssignor

RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进?排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进?排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与?些Topic的分配。

相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的?式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越?)。

对于消费组内消费者订阅Topic不?致的情况:假设有两个个消费者分别为C0和C1,有2个Topic T1、T2,分别拥有3和2个分区,并且C0订阅T1和T2,C1订阅T2,那么RoundRobinAssignor的分配结果如下:

看上去分配已经尽量的保证均衡了,不过可以发现C0承担了4个分区的消费?C1订阅了T2?个分区,是不是把T2P0交给C1消费能更加的均衡呢?

6.3 StickyAssignor

动机

尽管RoundRobinAssignor已经在RangeAssignor上做了?些优化来更均衡的分配分区,但是在?些情况下依旧会产?严重的分配偏差,?如消费组中订阅的Topic列表不相同的情况下。

更核?的问题是?论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上?次的分配结果。显然,在执??次新的分配之前,如果能考虑到上?次分配的结果,尽量少的调整分区分配的变动,显然是能节省很多开销的。

目标

从字?意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”:

  1. 分区的分配尽量的均衡
  2. 每?次重分配的结果尽量与上?次分配结果保持?致

当这两个?标发?冲突时,优先保证第?个?标。第?个?标是每个分配算法都尽量尝试去完成的,?第?个?标才真正体现出StickyAssignor特性的。

我们先来看预期分配的结构,后续再具体分析StickyAssignor的算法实现。

例如:

  • 有3个Consumer:C0、C1、C2
  • 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区
  • 所有Consumer都订阅了这4个分区

StickyAssignor的分配结果如下图所示(增加RoundRobinAssignor分配作为对?):

如果消费者1宕机,则按照RoundRobin的?式分配结果如下。打乱从新来过,轮询分配:

按照Sticky的?式。仅对消费者1分配的分区进?重分配,红线部分。最终达到均衡的?的:

再举?个例?:

  • 有3个Consumer:C0、C1、C2
  • 3个Topic:T0、T1、T2,它们分别有1、2、3个分区
  • C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2

分配结果如下图所示:

消费者0下线,则按照轮询的?式分配:

按照Sticky?式分配分区,仅仅需要动的就是红线部分,其他部分不动。

StickyAssignor分配?式的实现稍微复杂点?,我们可以先理解图示部分即可。感兴趣的同学可以研究?下。

6.4 自定义分区策略

?定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接?。PartitionAssignor接?的定义如下:

Subscription subscription(Set<String> topics);String name();Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions);void onAssignment(Assignment assignment);class Subscription {     private final List<String> topics;     private final ByteBuffer userData;}class Assignment {     private final List<TopicPartition> partitions;     private final ByteBuffer userData;}

PartitionAssignor接?中定义了两个内部类:Subscription和Assignment。

Subscription类?来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费者所订阅topic列表和?户?定义信息。PartitionAssignor接?通过subscription()?法来设置消费者?身相关的Subscription信息,注意到此?法中只有?个参数topics,与Subscription类中的topics的相互呼应,但是并没有有关userData的参数体现。为了增强?户对分配结果的控制,可以在subscription()?法内部添加?些影响分配的?户?定义信息赋予userData,?如:权重、ip地址、host或者机架(rack)等等。

再来说?下Assignment类,它是?来表示分配结果信息的,类中也有两个属性:partitions和userData,分别表示所分配到的分区集合和?户?定义的数据。可以通过PartitionAssignor接?中的onAssignment()?法是在每个消费者收到消费组leader分配结果时的回调函数,例如在StickyAssignor策略中就是通过这个?法保存当前的分配?案,以备在下次消费组再平衡(rebalance)时可以提供分配参考依据。

接?中的name()?法?来提供分配策略的名称,对于Kafka提供的3种分配策略??,RangeAssignor对应的protocol_name为“range”,RoundRobinAssignor对应的protocol_name为“roundrobin”,StickyAssignor对应的protocol_name为“sticky”,所以?定义的分配策略中要注意命名的时候不要与已存在的分配策略发?冲突。这个命名?来标识分配策略的名称,在后?所描述的加?消费组以及选举消费组leader的时候会有涉及。

真正的分区分配?案的实现是在assign()?法中,?法中的参数metadata表示集群的元数据信息,?subscriptions表示消费组内各个消费者成员的订阅信息,最终?法返回各个消费者的分配信息。

Kafka中还提供了?个抽象类org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化PartitionAssignor接?的实现,对assign()?法进?了实现,其中会将Subscription中的userData信息去掉后,在进?分配。Kafka提供的3种分配策略都是继承?这个抽象类。如果开发?员在?定义分区分配策略时需要使?userData信息来控制分区分配的结果,那么就不能直接继承AbstractPartitionAssignor这个抽象类,?需要直接实现PartitionAssignor接?。

自定义分区策略

package org.apache.kafka.clients.consumer;import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;import org.apache.kafka.common.TopicPartition;import java.util.*;public class MyAssignor extends AbstractPartitionAssignor { }

在使?时,消费者客户端需要添加相应的Properties参数,示例如下:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, MyAssignor.class.getName());
posted @ 2022-05-30 21:44 下半夜的风 阅读(0) 评论(0) 编辑 收藏 举报
回帖
    优雅殿下

    优雅殿下 (王者 段位)

    2018 积分 (2)粉丝 (47)源码

    小小码农,大大世界

     

    温馨提示

    亦奇源码

    最新会员