以用户下单购买商品的行为举例,在使用微服务架构时,我们需要调用多个服务。传统的调用方式是同步调用,这会存在一定的性能问题

使用消息队列可以实现异步的通信方式,相比于同步的通信?式,异步的?式可以让上游快速成功,极大提高系统的吞吐量。在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终?致性

Kafka 是?个分布式的、?持分区的(partition)、多副本的 (replica),基于 zookeeper 协调的分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各类需求场景:
| 名称 | 解释 |
|---|---|
| Broker | 消息中间件处理节点,?个 Kafka 节点就是?个 broker,?个或者多个 Broker 可以组成?个 Kafka 集群 |
| Topic | Kafka 根据 topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定?个 topic |
| Producer | 消息生产者,向 Broker 发送消息的客户端 |
| Consumer | 消息消费者,从 Broker 读取消息的客户端 |
| ConsumerGroup | 每个 Consumer 属于?个特定的 Consumer Group,?条消息可以被多个不同的 Consumer Group 消费,但是?个 Consumer Group 中只能有?个 Consumer 能够消费该消息 |
| Partition | 物理上的概念,?个 topic 可以分为多个 partition,每个 partition 内部消息是有序的 |
安装 Kafka 之前需要先安装 JDK 和 Zookeeper,在官网下载 Kafka 安装包:http://kafka.apache.org/downloads,直接解压即可
需要修改配置文件,进?到 config 目录内,修改 server.properties
# broker.id 属性在 kafka 集群中必须唯一broker.id= 0# kafka 部署的机器 ip 和提供服务的端口号listeners=PLAINTEXT://192.168.65.60:9092# kafka 的消息存储文件log.dir=/usr/local/data/kafka-logs# kafka 连接 zookeeper 的地址zookeeper.connect= 192.168.65.60:2181server.properties 核心配置详解:
| Property | Default | Description |
|---|---|---|
| broker.id | 0 | 每个 broker 都可以用?个唯?的非负整数 id 进行标识,作为 broker 的 名字 |
| log.dirs | /tmp/kafka-logs | kafka 存放数据的路径,这个路径并不是唯?的,可以是多个,路径之间只需要使?逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行 |
| listeners | PLAINTEXT://192.168.65.60:9092 | server 接受客户端连接的端?,ip 配置 kafka 本机 ip 即可 |
| zookeeper.connect | localhost:2181 | zooKeeper 连接字符串的格式为:hostname:port,此处 hostname 和 port 分别是 ZooKeeper 集群中某个节点的 host 和 port;zookeeper 如果是集群,连接?式为 hostname1:port1,hostname2:port2,hostname3:port3 |
| log.retention.hours | 168 | 每个日志文件删除之前保存的时间,默认数据保存时间对所有 topic 都?样 |
| num.partitions | 1 | 创建 topic 的默认分区数 |
| default.replication.factor | 1 | ?动创建 topic 的默认副本数量,建议设置为?于等于 2 |
| min.insync.replicas | 1 | 当 producer 设置 acks 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每?个 repica 的写数据都是成功的),如果这个数目没有达到,producer 发送消息会产生异常 |
| delete.topic.enable | false | 是否允许删除主题 |
进入到 bin 目录下,使用命令来启动
./kafka-server-start.sh -daemon../config/server.properties验证是否启动成功:进入到 zk 中的节点看 id 是 0 的 broker 有没有存在(上线)
ls /brokers/ids/topic 可以实现消息的分类,不同消费者订阅不同的 topic

执行以下命令创建名为 test 的 topic,这个 topic 只有一个 partition,并且备份因子也设置为 1
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test查看当前 kafka 内有哪些 topic
./kafka-topics.sh --list --zookeeper 172.16.253.35:2181把消息发送给 broker 中的某个 topic,打开?个 kafka 发送消息的客户端,然后开始?客户端向 kafka 服务器发送消息
kafka 自带了一个 producer 命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到 kafka 集群中。在默认情况下,每一个行会被当做成一个独立的消息
./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test对于 consumer,kafka 同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用 kafka 的消费者客户端,从指定 kafka 服务器的指定 topic 中消费消息
方式一:从最后一条消息的 偏移量+1 开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test消息的发送方会把消息发送到 broker 中,broker 会存储消息,消息是按照发送的顺序进行存储。因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后一个消息的下一个偏移量开始消费
一个消费组里只有一个消费者能消费到某一个 topic 中的消息,可以创建多个消费者,这些消费者在同一个消费组中
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic test在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。kafka 实现多播,只需要让不同的消费者处于不同的消费组即可
./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup1 --topic test./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup2 --topic test# 查看当前主题下有哪些消费组./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup

生产者将消息发送给 broker,broker 会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log消息的保存是有序的,通过 offset 偏移量来描述消息的有序性
消费者消费消息时也是通过 offset 来描述当前要消费的那条消息的位置
主题 Topic 在 kafka 中是?个逻辑概念,kafka 通过 topic 将消息进行分类。不同的 topic 会被订阅该 topic 的消费者消费。但是有?个问题,如果说这个 topic 的消息非常多,消息是会被保存到 log 日志文件中的,这会出现文件过大的问题,因此,kafka 提出了 Partition 分区的概念

通过 partition 将?个 topic 中的消息分区来存储,这样的好处有多个:
为?个主题创建多个分区
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --topic test1通以下命令查看 topic 的分区信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1分区的作用:
了解了 Partition,再补充一个 Kafka 细节:在消息日志文件中,kafka 内部创建了 __consumer_offsets 主题包含了 50 个分区。这个主题用来存放消费者某个主题的偏移量,每个消费者会把消费的主题的偏移量自主上报给 kafka 中的默认主题:consumer_offsets。因此 kafka 为了提升这个主题的并发性,默认设置了 50 个分区
hash(consumerGroupId) % __consumer_offsets 主题的分区数创建三个 server.properties 文件
# 0 1 2broker.id=2# 9092 9093 9094listeners=PLAINTEXT://192.168.65.60:9094# kafka-logs kafka-logs-1 kafka-logs-2log.dir=/usr/local/data/kafka-logs-2通过命令启动三台 broker
./kafka-server-start.sh -daemon../config/server0.properties./kafka-server-start.sh -daemon../config/server1.properties./kafka-server-start.sh -daemon../config/server2.properties搭建完后通过查看 zk 中的 /brokers/ids 看是否启动成功
下面的命令,在创建主题时,除了指明了主题的分区数以外,还指明了副本数,分别是:一个主题,两个分区、三个副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic通过查看主题信息,其中的关键数据:

replicas:当前副本所存在的 broker 节点
leader:副本里的概念
follower:leader 处理所有针对这个 partition 的读写请求,而 follower 被动复制 leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果 leader 所在的 broker 挂掉,那么就会进行新 leader 的选举
isr:可以同步的 broker 节点和已同步的 broker 节点,存放在 isr 集合中
Kafka 集群中由多个 broker 组成,?个 broker 存放?个 topic 的不同 partition 以及它们的副本

./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version></dependency>/** * 消息的发送方 */public class MyProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094"); // 把发送的 key 从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 把发送消息 value 从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 2.创建?产消息的客户端,传?参数 Producer<String, String> producer = new KafkaProducer<String, String>(props); // 3.创建消息 // key: 作?是决定了往哪个分区上发 // value: 具体要发送的消息内容 ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka"); // 4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println("同步?式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); }}ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(), JSON.toJSONString(order));如果未指定分区,则会通过业务 Key 的 hash 运算,得出要发送的分区,公式为:hash(key)%partitionNum
?产者同步发消息,在收到 kafka 的 ack 告知发送成功之前将?直处于阻塞状态
// 等待消息发送成功的同步阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());异步发送,?产者发送完消息后就可以执?之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法
// 指定发送分区ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order));// 异步回调方式发送消息producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null) { System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset()); } }});在同步发送的前提下,生产者在获得集群返回的 ack 之前会?直阻塞,那么集群什么时候返回 ack 呢?此时 ack 有三个配置:
min.insync.replicas(默认为 1 ,推荐配置大于等于2)这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证,一般是金融级别,或跟钱打交道的场景才会使用这种配置props.put(ProducerConfig.ACKS_CONFIG, "1");// 发送失败,默认会重试三次,每次间隔 100msprops.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100)
kafka 默认会创建?个消息缓冲区,用来存放要发送的消息,缓冲区是 32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);kafka 本地线程会在缓冲区中?次拉 16k 的数据,发送到 broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);如果线程拉不到 16k 的数据,间隔 10ms 也会将已拉到的数据发到 broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);public class MySimpleConsumer { private final static String TOPIC_NAME = "my-replicated-topic"; private final static String CONSUMER_GROUP_NAME = "testGroup"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094"); // 消费分组名 props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 1.创建?个消费者的客户端 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 2.消费者订阅主题列表 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { /* * 3. poll() API 是拉取消息的?轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { // 4.打印消息 System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }}无论是自动提交还是手动提交,都需要把所属的 消费组 + 消费的某个主题 + 消费的某个分区 + 消费的偏移量 提交到集群的 _consumer_offsets 主题里面
自动提交:消费者 poll 消息下来以后自动提交 offset
// 是否自动提交 offset,默认就是 trueprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自动提交 offset 的间隔时间props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");注意:如果消费者还没消费完 poll 下来的消息就自动提交了偏移量,此时消费者挂了,于是下?个消费者会从已提交的 offset 的下?个位置开始消费消息,之前未被消费的消息就丢失掉了
手动提交:需要把自动提交的配置改成 false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");手动提交又分成了两种:
手动同步提交
在消费完消息后调用同步提交的方法,当集群返回 ack 前?直阻塞,返回 ack 后表示提交成功,执行之后的逻辑
while (true) { /* * poll() API 是拉取消息的?轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value()); } // 所有的消息已消费完 if (records.count() > 0) { // 有消息 // ?动同步提交 offset, 当前线程会阻塞直到 offset 提交成功 // ?般使?同步提交, 因为提交之后?般也没有什么逻辑代码了 consumer.commitSync(); // ====阻塞=== 提交成功 }}手动异步提交
在消息消费完后提交,不需要等到集群 ack,直接执行之后的逻辑,可以设置?个回调方法,供集群调用
while (true) { /* * poll() API 是拉取消息的?轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } // 所有的消息已消费完 if (records.count() > 0) { // 手动异步提交 offset,当前线程提交 offset 不会阻塞,可以继续处理后?的程序逻辑 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for " + offsets); System.err.println("Commit failed exception: " + exception.getStackTrace()); } } }); }}消费者建立与 broker 之间的长连接,开始 poll 消息,默认?次 poll 五百条消息
// ?次 poll 最?拉取消息的条数,可以根据消费速度的快慢来设置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)可以根据消费速度的快慢来设置,如果两次 poll 的时间超出了 30s 的时间间隔,kafka 会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者
代码中设置了长轮询的时间是 1000 毫秒
while (true) { /* * poll() API 是拉取消息的?轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }}消费者每隔 1s 向 Kafka 集群发送心跳,集群发现如果有超过 10s 没有续约的消费者,将被踢出消费组,触发该消费组的 rebalance 机制,将该分区交给消费组里的其他消费者进行消费
// consumer 给 broker 发送?跳的间隔时间props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);// kafka 如果超过 10 秒没有收到消费者的?跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));也即从头开始消费消息
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);根据时间,去所有的 partition 中确定该时间对应的 offset,然后去所有的 partition 中找到该 offset 之后的消息开始消费
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);// 从一小时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) { continue; } // 根据消费?的 timestamp 确定 offset Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); }}新消费组中的消费者在启动以后,默认会从当前分区的最后?条消息的 offset+1 开始消费(消费新消息),可以通过以下的设置,让新的消费者第?次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>server: port: 8080spring: kafka: bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 producer: # ?产者 retries: 3 # 设置?于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定消息key和消息体的编解码?式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 500 listener: # 当每?条记录被消费者监听器(ListenerConsumer)处理之后提交 # RECORD # 当每?批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交 # BATCH # 当每?批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间?于TIME时提交 # TIME # 当每?批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量?于等于COUNT时提交 # COUNT # TIME | COUNT 有?个条件满?时提交 # COUNT_TIME # 当每?批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ?动调?Acknowledgment.acknowledge()后提交 # MANUAL # 手动调?Acknowledgment.acknowledge()后?即提交,?般使?这种 # MANUAL_IMMEDIATE ack-mode: MANUAL_IMMEDIATE redis: host: 172.16.253.21import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/msg")public class MyKafkaController { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("/send") public String sendMessage(){ kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!"); return "send success!"; }}import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;@Componentpublic class MyConsumer { @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); // 手动提交offset ack.acknowledge(); }}配置消费主题、分区和偏移量
@KafkaListener(groupId = "testGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "3") // concurrency 就是同组下的消费者个数,就是并发消费数,建议?于等于分区总数public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); // 手动提交offset ack.acknowledge();}Kafka 集群中的 broker 在 zookeeper 中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的 controller,负责管理整个集群中的所有分区和副本的状态:
如果消费者没有指明分区消费,那么当消费组里消费者和分区的关系发生变化,就会触发 rebalance 机制,重新调整消费者该消费哪个分区
在触发 rebalance 机制之前,消费者消费哪个分区有三种分配策略:
(分区总数/消费者数量)+1,之后的消费者是 分区总数/消费者数量LEO 是某个副本最后消息的消息位置(log-end-offset),HW 是已完成同步的位置。消息在写入 broker 时,且每个 broker 完成这条消息的同步后,HW 才会变化。在这之前,消费者是消费不到这条消息的,在同步完成之后,HW 更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失

生产者:
消费者:
如果生产者发送完消息后,却因为网络抖动,没有收到 ack,但实际上 broker 已经收到了。此时生产者会进行重试,于是 broker 就会收到多条相同的消息,而造成消费者的重复消费
解决方案:
生产者:使用同步发送,ack 设置成非 0 的值
消费者:主题只能设置?个分区,消费组中只能有?个消费者
所谓消息积压,就是消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致 kafka 中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个 kafka 对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩
解决方案:

假设一个应用场景:订单创建后,超过 30 分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现,实现方案如下:
