消费者:
__consumer_offsets的主题中Zookeeper,需要设置offset.storage=zookeeper消费组:
configs.put("group.id", "xxx");group.id=xxxxx?个拥有四个分区的主题,包含?个消费者的消费组
此时,消费组中的消费者消费主题中的所有分区。并且没有重复的可能。
如果在消费组中添加?个消费者2,则每个消费者分别从两个分区接收消息
如果消费组有四个消费者,则每个消费者可以分配到?个分区
如果向消费组中添加更多的消费者,超过主题分区数量,则有?部分消费者就会闲置,不会接收任何消息
向消费组添加消费者是横向扩展消费能?的主要?式。
必要时,需要为主题创建?量分区,在负载增?时可以加?更多的消费者。但是不要让消费者的数量超过主题分区的数量。
除了通过增加消费者来横向扩展单个应?的消费能?之外,经常出现多个应?程序从同?个主题消费的情况。
此时,每个应?都可以获取到所有的消息。只要保证每个应?都有??的消费组,就可以让它们获取到主题所有的消息。
横向扩展消费者和消费组不会对性能造成负?影响。
为每个需要获取?个或多个主题全部消息的应?创建?个消费组,然后向消费组添加消费者来横向扩展消费能?和应?的处理能?,则每个消费者只处理?部分消息。
初始的消费者消费分区:
消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区
由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置
Kafka 的?跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送?跳。
Consumer 和 Rebalance 相关的 2 个配置参数:
| 参数 | 字段 |
|---|---|
| session.timeout.ms | MemberMetadata.sessionTimeoutMs |
| max.poll.interval.ms | MemberMetadata.rebalanceTimeoutMs |
broker 端,sessionTimeoutMs 参数
broker 处理?跳的逻辑在 GroupCoordinator类中。如果?跳超期, broker coordinator 会把消费者从 group 中移除,并触发 rebalance。
可以看看源码的kafka.coordinator.group.GroupCoordinator#completeAndScheduleNextHeartbeatExpiration方法。
如果客户端发现?跳超期,客户端会标记 coordinator 为不可?,并阻塞?跳线程;如果超过了 poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发 rebalance
可以看看源码的org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread 内部类
| 参数 | 说明 |
|---|---|
| bootstrap.servers | 向Kafka集群建?初始连接?到的host/port列表。 客户端会使?这?列出的所有服务器进?集群其他服务器的发现,?不管是否指定了哪个服务器?作引导。 这个列表仅影响?来发现集群所有服务器的初始主机。 字符串形式:host1:port1,host2:port2,... 由于这组服务器仅?于建?初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这?。 ?般最好两台,以防其中?台宕掉。 |
| key.deserializer | key的反序列化类,该类需要实现org.apache.kafka.common.serialization.Deserializer接?。 |
| value.deserializer | 实现了org.apache.kafka.common.serialization.Deserializer接?的反序列化器,?于对消息的value进?反序列化。 |
| client.id | 当从服务器消费消息的时候向服务器发送的id字符串。在ip/port基础上提供应?的逻辑名称,记录在服务端的请求?志中,?于追踪请求的源。 |
| group.id | ?于唯?标志当前消费者所属的消费组的字符串。 如果消费者使?组管理功能如subscribe(topic)或使?基于Kafka的偏移量管理策略,该项必须设置。 |
| auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:?动重置偏移量到最早的偏移量 latest:?动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常 anything:向消费者抛异常 |
| enable.auto.commit | 如果设置为true,消费者会?动周期性地向服务器提交偏移量。 |
Topic:Kafka?于分类管理消息的逻辑单元,类似与MySQL的数据库。
*Partition:是Kafka下数据存储的基本单元,这个是物理上的概念。同?个topic的数据,会被分散的存储到多个partition中,这些partition可以在同?台机器上,也可以是在多台机器上。优势在于:有利于?平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提?容灾能?。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。
Consumer Group:同样是逻辑上的概念,是Kafka实现单播和?播两种消息模型的?段**。保证?个消费组获取到特定主题的全部的消息。在消费组内部,若?个消费者消费主题分区的消息,消费组可以保证?个主题的每个分区只被消费组中的?个消费者消费。
consumer 采? pull 模式从 broker 中读取数据。
采? pull 模式,consumer 可?主控制消费消息的速率, 可以??控制消费?式(批量消费/逐条消费),还可以选择不同的提交?式从?实现不同的传输语义。
订阅主题:consumer.subscribe("tp_demo_01,tp_demo_02")
Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进?反序列化处理,然后才能交给?户程序消费处理。
常用的Kafka提供的,反序列化器包括key的和value的反序列化器:
消费者从订阅的主题拉取消息:consumer.poll(3_000);
在Fetcher类中,对拉取到的消息?先进?反序列化处理:
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) { try { long offset = record.offset(); long timestamp = record.timestamp(); Optional<Integer> leaderEpoch = this.maybeLeaderEpoch(batch.partitionLeaderEpoch()); TimestampType timestampType = batch.timestampType(); Headers headers = new RecordHeaders(record.headers()); ByteBuffer keyBytes = record.key(); byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); ByteBuffer valueBytes = record.value(); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); return new ConsumerRecord(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? -1 : keyByteArray.length, valueByteArray == null ? -1 : valueByteArray.length, key, value, headers, leaderEpoch); } catch (RuntimeException var17) { throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", var17); }}Kafka默认提供了?个反序列化的实现:
org.apache.kafka.common.serialization.ByteArrayDeserializer
org.apache.kafka.common.serialization.ByteBufferDeserializer
org.apache.kafka.common.serialization.BytesDeserializer
org.apache.kafka.common.serialization.DoubleDeserializer
org.apache.kafka.common.serialization.FloatDeserializer
org.apache.kafka.common.serialization.IntegerDeserializer
org.apache.kafka.common.serialization.LongDeserializer
org.apache.kafka.common.serialization.ShortDeserializer
org.apache.kafka.common.serialization.StringDeserializer
反序列化器都需要实现org.apache.kafka.common.serialization.Deserializer<T>接?:
这里根据前面自定义的序列化器,再自定义一个反序列化器。
先回顾一下前面的序列化器,添加了一个 User 对象,需要序列化User对象:
public class User { private Integer userId; private String username; // set、get、toString、全参构造函数、无参构造函数 方法省略,}/** * User对象的序列化器 */public class UserSerializer implements Serializer<User> { @Override public void configure(Map<String, ?> map, boolean b) { // do Nothing } @Override public byte[] serialize(String topic, User user) { try { // 如果数据是null,则返回null if (user == null) return null; Integer userId = user.getUserId(); String username = user.getUsername(); int length = 0; byte[] bytes = null; if (null != username) { bytes = username.getBytes("utf-8"); length = bytes.length; } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length); buffer.putInt(userId); buffer.putInt(length); buffer.put(bytes); return buffer.array(); } catch (UnsupportedEncodingException e) { throw new SerializationException("序列化数据异常"); } } @Override public void close() { // do Nothing }}这里再自定义一个反序列化器:
public class UserDeserializer implements Deserializer<User> { @Override public void configure(Map<String, ?> configs, boolean isKey) { // do Nothing } @Override public User deserialize(String topic, byte[] data) { ByteBuffer allocate = ByteBuffer.allocate(data.length); allocate.put(data); allocate.flip(); int userId = allocate.getInt(); int length = allocate.getInt(); String userName = new String(data, 8, length); return new User(userId, userName); } @Override public void close() { // do Nothing }}消费者使用自定义反序列化器:
消费者在拉取了分区消息之后,要?先经过反序列化器对key和value进?反序列化处理。
处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应?程序进?处理。
消费端定义消息拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V>接?。
configre?法获取消费者配置的属性,如果消费者配置中没有指定clientID,还可以获取KafkaConsumer?成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产?冲突。ConsumerInterceptor?法抛出的异常会被捕获、记录,但是不会向下传播。如果?户配置了错误的key或value类型参数,消费者不会抛出异常,?仅仅是记录下来。ConsumerInterceptor回调发?在org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)?法同?个线程该接?中有如下?法:
public interface ConsumerInterceptor<K, V> extends Configurable { /** * 该?法在poll?法返回之前调?。调?结束后poll?法就返回消息了。 * * 该?法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或?成新的消息。 * 如果有多个拦截器,则该?法按照KafkaConsumer的configs中配置的顺序调?。 * * @param records 由上个拦截器返回的由客户端消费的消息。 */ public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); /** * 当消费者提交偏移量时,调?该?法 * 该?法抛出的任何异常调?者都会忽略。 */ public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); /** * This is called when interceptor is closed */ public void close();}代码实现
自定义一个消费者拦截器:
public class OneInterceptor implements ConsumerInterceptor<String, String> { @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { // poll?法返回结果之前最后要调?的?法 System.out.println("One -- 开始"); // 消息不做处理,直接返回 return records; } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { // 消费者提交偏移量的时候,经过该?法 System.out.println("One -- 结束"); } @Override public void close() { // ?于关闭该拦截器?到的资源,如打开的?件,连接的数据库等 } @Override public void configure(Map<String, ?> configs) { // ?于获取消费者的设置参数 configs.forEach((k, v) -> { System.out.println(k + "\t" + v); }); }}按照 OneInterceptor 拦截器复制两个拦截器,更名为 TwoInterceptor、ThreeInterceptor
消费者使用自定义拦截器:
位移提交介绍:
Committing Offsets)__consumer_offsetsKafka Consumer 后台提交
enable.auto.commit=trueauto.commit.interval.ms,默认 5s在消费者中设置自动提交和自动提交间隔:
Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "192.168.0.102:9092");configs.put("group.id", "mygrp");// 设置偏移量?动提交。?动提交是默认值。这?做示例。configs.put("enable.auto.commit", "true");// 偏移量?动提交的时间间隔configs.put("auto.commit.interval.ms", "3000");configs.put("key.deserializer", StringDeserializer.class);configs.put("value.deserializer", StringDeserializer.class);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);?动提交位移的顺序:
enable.auto.commit = true重复消费举例:
使? KafkaConsumer#commitSync():会提交 KafkaConsumer#poll()返回的最新 offset
该?法为同步操作,等待直到 offset 被成功提交才返回
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 }}commitSync 在处理完所有消息之后
?动同步提交可以控制offset提交的时机和频率
?动同步提交会:
KafkaConsumer#commitAsync()
while (true) { ConsumerRecords<String, String> records = consumer.poll(3_000); process(records); // 处理消息 consumer.commitAsync((offsets, exception) -> { if (exception != null) { handle(exception); } });}commitAsync出现问题不会?动重试
手动异步提交不会自动重试的解决方案:
try { while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 commitAysnc(); // 使?异步提交规避阻塞 }} catch(Exception e) { handle(e); // 处理异常} finally { try { consumer.commitSync(); // 最后?次提交使?同步阻塞式提交 } finally { consumer.close(); }}Kafka中,消费者根据消息的位移顺序消费消息。
消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题__consumer_offsets中。
Kafka提供了消费者API,让消费者可以管理??的位移。
KafkaConsumer<K, V> 的 API如下:
public void assign(Collection<TopicPartition> partitions)
说明:
给当前消费者?动分配?系列主题分区。
?动分配分区不?持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。
如果给出的主题分区是空的,则等价于调?unsubscribe?法。
?动分配主题分区的?法不使?消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。
?动分区分配assign(Collection)不能和?动分区分配subscribe(Collection,ConsumerRebalanceListener)?起使?。
如果启?了?动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进?异步提交。
public Set<TopicPartition> assignment()
说明:
获取给当前消费者分配的分区集合。如果订阅是通过调?assign?法直接分配主题分区,则返回相同的集合。如果使?了主题订阅,该?法返回当前分配给该消费者的主题分区集合。如果分区订阅还没开始进?分区分配,或者正在重新分配分区,则会返回none。
public Map<String, List<PartitionInfo>> listTopics()
说明:
获取对?户授权的所有主题分区元数据。该?法会对服务器发起远程调?。
public List<PartitionInfo> partitionsFor(String topic)
说明:
获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调?。
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
说明:
对于给定的主题分区,列出它们第?个消息的偏移量。
注意,如果指定的分区不存在,该?法可能会永远阻塞。
该?法不改变分区的当前消费者偏移量。
public void seekToEnd(Collection<TopicPartition> partitions)
说明:
将偏移量移动到每个给定分区的最后?个。
该?法延迟执?,只有当调?过poll?法或position?法之后才可以使?。
如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。
如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到最后?个稳定的偏移量,即下?个要消费的消息现在还是未提交状态的事务消息。
public void seek(TopicPartition partition, long offset)
说明:
将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下?条要消费的消息偏移量。
若该?法多次调?,则最后?次的覆盖前?的。
如果在消费中间随意使?,可能会丢失数据。
public long position(TopicPartition partition)
说明:
检查指定主题分区的消费偏移量
public void seekToBeginning(Collection<TopicPartition> partitions)
说明:
将给定每个分区的消费者偏移量移动到它们的起始偏移量。该?法懒执?,只有当调?过poll?法或position?法之后才会执?。如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。
准备数据
# ?成消息?件[root@node1 ~]# for i in `seq 60`; do echo "hello $i" >> nm.txt; done# 创建主题,三个分区,每个分区?个副本[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1# 将消息?产到主题中[root@node1 ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic tp_demo_01 < nm.txtAPI 实战:
/** * 消费者位移管理 */public class MyConsumer2 { public static void main(String[] args) { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.102:9092"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config); // 给当前消费者?动分配?系列主题分区 consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 1))); // 获取给当前消费者分配的分区集合 Set<TopicPartition> assignment = consumer.assignment(); assignment.forEach(topicPartition -> System.out.println(topicPartition)); // 获取对?户授权的所有主题分区元数据。该?法会对服务器发起远程调? Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics(); stringListMap.forEach((k, v) -> { System.out.println("主题:" + k); v.forEach(info -> System.out.println(info)); }); Set<String> strings = consumer.listTopics().keySet(); strings.forEach(topicName -> System.out.println(topicName)); // 获取指定主题的分区元数据 List<PartitionInfo> partitionInfos = consumer.partitionsFor("tp_demo_01"); for (PartitionInfo partitionInfo : partitionInfos) { Node leader = partitionInfo.leader(); System.out.println(leader); System.out.println(partitionInfo); // 当前分区在线副本 Node[] nodes = partitionInfo.inSyncReplicas(); // 当前分区下线副本 Node[] nodes1 = partitionInfo.offlineReplicas(); } // 对于给定的主题分区,列出它们第?个消息的偏移量。 // 注意,如果指定的分区不存在,该?法可能会永远阻塞。 // 该?法不改变分区的当前消费者偏移量。 Map<TopicPartition, Long> topicPartitionLongMap = consumer.beginningOffsets(consumer.assignment()); topicPartitionLongMap.forEach((k, v) -> { System.out.println("主题:" + k.topic() + "\t分区:" + k.partition() + "偏移量\t" + v); }); // 将偏移量移动到每个给定分区的最后?个。 consumer.seekToEnd(consumer.assignment()); //将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下?条要消费的消息偏移量。 consumer.seek(new TopicPartition("tp_demo_01", 1), 10); // 检查指定主题分区的消费偏移量 long position = consumer.position(new TopicPartition("tp_demo_01", 1)); System.out.println(position); // 将偏移量移动到每个给定分区的最后?个。 consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 1))); // 关闭?产者 consumer.close(); }}重平衡可以说是kafka为?诟病最多的?个点了。
重平衡其实就是?个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每?个分区。?如?个topic有100个分区,?个消费者组内有20个消费者,在协调者的控制下让组内每?个消费者分配到5个分区,这个分配的过程就是重平衡。
重平衡的触发条件主要有三个:

消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。
主题增加分区,需要主题分区和消费组进?再均衡。
由于使?正则表达式订阅主题,当增加的主题匹配正则表达式的时候,也要进?再均衡。
为什么说重平衡为?诟病呢?因为重平衡过程中,消费者?法从kafka消费消息,这对kafka的TPS影响极?,?如果kafka集内节点较多,?如数百个,那重平衡可能会耗时极多。数分钟到数?时都有可能,?这段时间kafka基本处于不可?状态。所以在实际环境中,应该尽量避免重平衡发?。
不可能完全避免重平衡,因为你?法完全保证消费者不会故障。?消费者故障其实也是最常?的引发重平衡的地?,所以我们需要保证尽?避免消费者故障。
?其他?种触发重平衡的?式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。
如果消费者真正挂掉了,就没办法了,但实际中,会有?些情况,kafka错误地认为?个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。
?先要知道哪些情况会出现错误判断挂掉的情况。
在分布式系统中,通常是通过?跳来维持分布式系统的,kafka也不例外。
在分布式系统中,由于?络问题你不清楚没接收到?跳,是因为对?真正挂了还是只是因为负载过重没来得及发??跳或是?络堵塞。所以?般会约定?个时间,超时即判定对?挂了。?在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。
还有?个参数,heartbeat.interval.ms,这个参数控制发送?跳的频率,频率越?越不容易被误判,但也会消耗更多资源。
此外,还有最后?个参数,max.poll.interval.ms,消费者poll数据后,需要?些处理,再进?拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过max.poll.interval.ms这个参数的值。这个参数的默认值是5分钟,?如果消费者接收到数据后会执?耗时的操作,则应该将其设置得??些。
总结:
session.timout.ms控制?跳超时时间。heartbeat.interval.ms控制?跳发送频率。max.poll.interval.ms控制poll的间隔。这?给出?个相对较为合理的配置,如下:
session.timout.ms:设置为6sheartbeat.interval.ms:设置2smax.poll.interval.ms:推荐为消费者处理消息最?耗时再加1分钟| 配置项 | 说明 |
|---|---|
| bootstrap.servers | 建?到Kafka集群的初始连接?到的host/port列表。 客户端会使?这?指定的所有的host/port来建?初始连接。 这个配置仅会影响发现集群所有节点的初始连接。 形式:host1:port1,host2:port2... 这个配置中不需要包含集群中所有的节点信息。 最好不要配置?个,以免配置的这个节点宕机的时候连不上。 |
| group.id | ?于定义当前消费者所属的消费组的唯?字符串。 如果使?了消费组的功能 subscribe(topic),或使?了基于Kafka的偏移量管理机制,则应该配置group.id。 |
| auto.commit.interval.ms | 如果设置了enable.auto.commit的值为true,则该值定义了消费者偏移量向Kafka提交的频率。 |
| auto.offset.reset | 如果Kafka中没有初始偏移量或当前偏移量在服务器中不存在(?如数据被删掉了): earliest:?动重置偏移量到最早的偏移量。 latest:?动重置偏移量到最后?个 none:如果没有找到该消费组以前的偏移量没有找到,就抛异常。 其他值:向消费者抛异常。 |
| fetch.min.bytes | 服务器对每个拉取消息的请求返回的数据量最?值。 如果数据量达不到这个值,请求等待,以让更多的数据累积,达到这个值之后响应请求。 默认设置是1个字节,表示只要有?个字节的数据,就?即响应请求,或者在没有数据的时候请求超时。 将该值设置为??点?的数字,会让服务器等待稍微??点?的时间以累积数据。 如此则可以提?服务器的吞吐量,代价是额外的延迟时间。 |
| fetch.max.wait.ms | 如果服务器端的数据量达不到fetch.min.bytes的话,服务器端不能?即响应请求。该时间?于配置服务器端阻塞请求的最?时?。 |
| fetch.max.bytes | 服务器给单个拉取请求返回的最?数据量。 消费者批量拉取消息,如果第?个?空消息批次的值?该值?,消息批也会返回,以让消费者可以接着进?。 即该配置并不是绝对的最?值。 broker可以接收的消息批最?值通过 message.max.bytes(broker配置)或max.message.bytes(主题配置)来指定。需要注意的是,消费者?般会并发拉取请求。 |
| enable.auto.commit | 如果设置为true,则消费者的偏移量会周期性地在后台提交。 |
| connections.max.idle.ms | 在这个时间之后关闭空闲的连接。 |
| check.crcs | ?动计算被消费的消息的CRC32校验值。 可以确保在传输过程中或磁盘存储过程中消息没有被破坏。 它会增加额外的负载,在追求极致性能的场合禁?。 |
| exclude.internal.topics | 是否内部主题应该暴露给消费者。如果该条?设置为true,则只能先订阅再拉取。 |
| isolation.level | 控制如何读取事务消息。 如果设置了 read_committed,消费者的poll()?法只会返回已经提交的事务消息。如果设置了 read_uncommitted(默认值),消费者的poll?法返回所有的消息,即使是已经取消的事务消息。?事务消息以上两种情况都返回。 消息总是以偏移量的顺序返回。 read_committed只能返回到达LSO的消息。在LSO之后出现的消息只能等待相关的事务提交之后才能看到。结果, read_committed模式,如果有为提交的事务,消费者不能读取到直到HW的消息。read_committed的seekToEnd?法返回LSO。 |
| heartbeat.interval.ms | 当使?消费组的时候,该条?指定消费者向消费者协调器发送?跳的时间间隔。 ?跳是为了确保消费者会话的活跃状态,同时在消费者加?或离开消费组的时候?便进?再平衡。 该条?的值必须?于 session.timeout.ms,也不应该?于session.timeout.ms的1/3。可以将其调整得更?,以控制正常重新平衡的预期时间。 |
| session.timeout.ms | 当使?Kafka的消费组的时候,消费者周期性地向broker发送?跳数据,表明??的存在。 如果经过该超时时间还没有收到消费者的?跳,则broker将消费者从消费组移除,并启动再平衡。 该值必须在broker配置 group.min.session.timeout.ms和group.max.session.timeout.ms之间。 |
| max.poll.records | ?次调?poll()?法返回的记录最?数量。 |
| max.poll.interval.ms | 使?消费组的时候调?poll()?法的时间间隔。 该条?指定了消费者调?poll()?法的最?时间间隔。 如果在此时间内消费者没有调?poll()?法,则broker认为消费者失败,触发再平衡,将分区分配给消费组中其他消费者。 |
| max.partition.fetch.bytes | 对每个分区,服务器返回的最?数量。消费者按批次拉取数据。 如果?空分区的第?个记录?于这个值,批处理依然可以返回,以保证消费者可以进?下去。 broker接收批的??由 message.max.bytes(broker参数)或max.message.bytes(主题参数)指定。fetch.max.bytes?于限制消费者单次请求的数据量。 |
| send.buffer.bytes | ?于TCP发送数据时使?的缓冲??(SO_SNDBUF),-1表示使?OS默认的缓冲区??。 |
| retry.backoff.ms | 在发?失败的时候如果需要重试,则该配置表示客户端等待多?时间再发起重试。 该时间的存在避免了密集循环。 |
| request.timeout.ms | 客户端等待服务端响应的最?时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。 |
| reconnect.backoff.ms | 重新连接主机的等待时间。避免了重连的密集循环。 该等待时间应?于该客户端到broker的所有连接。 |
| reconnect.backoff.max.ms | 重新连接到反复连接失败的broker时要等待的最?时间(以毫秒为单位)。 如果提供此选项,则对于每个连续的连接失败,每台主机的退避将成倍增加,直?达到此最?值。 在计算退避增量之后,添加20%的随机抖动以避免连接?暴。 |
| receive.buffer.bytes | TCP连接接收数据的缓存(SO_RCVBUF)。-1表示使?操作系统的默认值。 |
| partition.assignment.strategy | 当使?消费组的时候,分区分配策略的类名。 |
| metrics.sample.window.ms | 计算指标样本的时间窗?。 |
| metrics.recording.level | 指标的最?记录级别。 |
| metrics.num.samples | ?于计算指标?维护的样本数量 |
| interceptor.classes | 拦截器类的列表。默认没有拦截器拦截器是消费者的拦截器,该拦截器需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接?。拦截器可?于对消费者接收到的消息进?拦截处理。 |
consumer group是kafka提供的可扩展且具有容错性的消费者机制。
三个特性:
group.id 是?个字符串,唯?标识?个消费组消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息。
每个消费组保存??的位移信息,那么只需要简单的?个整数表示位置就够了;同时可以引?checkpoint机制定期持久化。
?动VS?动
Kafka默认定期?动提交位移(enable.auto.commit = true),也?动提交位移。另外kafka会定期把group消费情况保存起来,做成?个offset map,如下图所示:

位移提交
位移是提交到Kafka中的__consumer_offsets主题。__consumer_offsets中的消息保存了每个消费组某?时刻提交的offset信息。
[root@localhost kafka_2.12-1.0.2]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config /usr/src/kafka_2.12-1.0.2/config/consumer.properties --from-beginning | head
上图中,标出来的,表示消费组为console-consumer-46068,消费的主题为topic_1,消费的分区是0,偏移量为5。
__consumers_offsets主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的?志容量,也能实现保存最新offset的?的。
什么是再平衡:
再均衡(Rebalance)本质上是?种协议,规定了?个消费组中所有消费者如何达成?致来分配订阅主题的每个分区。
?如某个消费组有20个消费组,订阅了?个具有100个分区的主题。正常情况下,Kafka平均会为每个消费者分配5个分区。这个分配的过程就叫再均衡。
什么时候再平衡:
再均衡的触发条件:
如何进行组内分区分配:
三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor
谁来执?再均衡和消费组管理:
Kafka提供了?个??:Group Coordinator来执?对于消费组的管理。
Group Coordinator——每个消费组分配?个消费组协调器?于组管理和位移管理。当消费组的第?个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。
如何确定coordinator:
确定消费组位移信息写?__consumers_offsets的哪个分区。具体计算公式:
_consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
该分区leader所在的broker就是组协调器。
Rebalance Generation:
它表示Rebalance之后主题分区到消费组中消费者映射关系的?个版本,主要是?于保护消费组,隔离?效偏移量提交的。如上?个版本的消费者?法提交位移到新版本的消费组中,因为映射关系变了,你消费的或许已经不是原来的那个分区了。每次group进?Rebalance之后,Generation号都会加1,表示消费组和分区的映射关系到了?个新版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进?Generation 2,之后成员4加?,再次触发Rebalance,消费组进?Generation 3.
协议(protocol)
kafka提供了5个协议来处理与消费组协调相关的问题:
组协调器在再均衡的时候主要?到了前?4种请求。
liveness
消费者如何向消费组协调器证明??还活着?
通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。?旦协调器认为某个消费者挂了,那么它就会开启新?轮再均衡,并且在当前其他消费者的?跳响应中添加“REBALANCE_IN_PROGRESS”,告诉其他消费者:重新分配分区。
再均衡过程
再均衡分为2步:Join和Sync

注意:在协调器收集到所有成员请求前,它会把已收到请求放??个叫purgatory(炼狱)的地?。然后是分发分配?案的过程,即SyncGroup请求:
消费组状态机
消费组组协调器根据状态机对消费组做不同的处理:
说明: