Skip to content

Kafka消息积压问题排查与解决方案

先搞清楚为什么会积压

面试官问你Kafka消息积压怎么解决,其实他想看的是你有没有真正处理过这种问题,能不能分析出问题的根源。所以咱们先得搞清楚,消息为啥会积压。

说白了,消息积压就是生产的速度比消费的速度快,消息在Broker里越攒越多。就像你家门口堆快递,送的比你拆的快,时间一长门口就堆满了。

消费端处理太慢

这是最常见的原因,消费者那边处理消息的速度跟不上。一般有这么几种情况:

业务逻辑太重了

你想啊,每条消息过来,你要查数据库、调第三方接口、写日志、发通知,一套流程下来几百毫秒甚至几秒,消息不积压才怪。我之前遇到过一个case,消费一条消息要调三个外部服务,其中一个服务响应特别慢,直接把整个消费链路拖垮了。

下游服务挂了或者变慢了

消费逻辑里调用的数据库、Redis、第三方接口如果响应变慢或者直接挂了,消费速度立马就下来了。这种情况下消费者可能还在不停地重试,CPU和网络都在空转,但就是消费不动。

消费者线程太少

Kafka消费者默认是单线程拉取消息的,如果你业务处理也是单线程,那处理能力就很有限。有的团队图省事,一个消费者实例就起一个线程处理消息,分区数设了10个,但只有一个消费者实例,那9个分区就相当于闲置了。

代码写得有问题

比如每条消息都去建数据库连接,用完不释放;或者有内存泄漏导致频繁GC;甚至有死锁、线程阻塞这种问题。这些bug平时流量低的时候看不出来,一上量就暴露了。

生产端流量突然暴增

有时候不是消费端的锅,而是生产端那边流量突然上来了:

  • 大促活动没做好预估,流量比预期高好几倍
  • 上游系统出bug,疯狂重复发消息
  • 批量任务或者数据同步任务把消息一股脑儿全推过来了
  • 某个业务逻辑写错了,循环里发消息没控制好

我见过最离谱的一次,开发同学写了个定时任务,本来应该增量同步数据,结果每次都全量同步,每天凌晨往Kafka里灌几百万条消息,消费者根本扛不住。

分区数和消费者数不匹配

Kafka的消费规则是一个分区只能被同一消费组内的一个消费者消费。所以如果你Topic有10个分区,但消费组里只有2个消费者,那每个消费者要负责5个分区,压力就大了。

反过来,如果消费者数量超过分区数,多出来的消费者就是摆设,啥也干不了,浪费资源。

Broker或网络出问题

这种情况相对少见,但也得考虑:

  • Broker磁盘IO打满了,写入读取都变慢
  • 网络抖动导致消费者频繁重连、rebalance
  • Kafka集群本身有节点宕机,触发分区迁移

怎么快速定位问题在哪

积压发生了,别慌,先定位问题出在哪一环。

看消费延迟指标

第一步肯定是看Consumer Lag,也就是消费延迟。这个指标表示当前消费位点和最新消息位点之间的差距。

bash
# 用kafka自带的工具查看消费组的lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group your-consumer-group

执行完会看到这样的输出:

plain
TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
order-topic     0         12345          56789         44444
order-topic     1         23456          67890         44434
order-topic     2         34567          78901         44334

LAG那一列就是积压的消息数量。如果LAG一直在涨,说明消费速度确实跟不上。

确认是生产快了还是消费慢了

看两个指标:

生产速率:看Broker的MessagesInPerSec指标,确认消息进来的速度是不是比平时高很多。

消费速率:看消费者的处理速度,可以在业务代码里打点统计,或者看消费者的records-consumed-rate指标。

如果生产速率暴增但消费速率正常,那就是流量问题;如果生产速率正常但消费速率下降了,那就是消费端的问题。

检查消费者的健康状态

看看消费者实例是不是都正常工作:

  • 消费者进程有没有挂掉或者假死
  • 有没有频繁发生rebalance(看日志里的Rebalance关键字)
  • 消费者的GC情况,有没有长时间的Full GC
bash
# 查看消费组内有多少活跃的消费者
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group your-consumer-group --members

排查下游依赖

如果消费逻辑里有调用其他服务,检查一下:

  • 数据库连接池是不是满了
  • Redis有没有响应变慢
  • 调用的HTTP接口有没有超时
  • 消息处理有没有抛异常导致不停重试

紧急情况下怎么快速消化积压

如果积压已经很严重了,影响到业务了,得先把积压的消息消化掉,然后再慢慢优化。

临时扩容消费者

最直接的办法就是加消费者。但记住一点:消费者数量最多等于分区数。如果你现在有10个分区,3个消费者,那最多可以扩到10个消费者。

java
// 确保你的消费者配置了正确的消费组
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "your-consumer-group");
props.put("enable.auto.commit", "false");
props.put("max.poll.records", "500"); // 每次拉取的消息数,可以适当调大

如果消费者数量已经等于分区数了,那就得考虑扩分区。不过扩分区是个大动作,要提前评估好。

临时增加分区数

bash
# 把分区数从10扩到20
kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter --topic your-topic --partitions 20

注意几个点:

  • Kafka分区只能增加不能减少
  • 扩分区后新消息会分散到新分区,但旧消息还在原来的分区
  • 如果用了key来分区,扩分区会影响消息的分布

临时跳过不重要的消息

这招比较激进,但有时候真的管用。如果积压的消息里有一部分是可以丢弃的(比如过时的通知、可以容忍丢失的日志),可以临时把这部分跳过。

java
// 方法一:直接重置消费位点到最新
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group your-consumer-group --topic your-topic \
  --reset-offsets --to-latest --execute

// 方法二:在代码里根据消息时间戳判断,过时的直接丢弃
if (message.timestamp() < System.currentTimeMillis() - 3600000) {
    // 消息超过1小时了,直接跳过
    log.warn("跳过过期消息: {}", message.key());
    return;
}

用批量处理提升吞吐量

如果消费逻辑支持批量处理,尽量改成批量的方式:

java
// 原来是一条一条处理
for (ConsumerRecord<String, String> record : records) {
    orderService.processOrder(record.value()); // 每条消息调一次数据库
}

// 改成批量处理
List<Order> orderList = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
    orderList.add(parseOrder(record.value()));
}
// 批量入库,一次数据库操作搞定
orderService.batchSaveOrders(orderList);

批量处理能大幅减少IO次数,提升吞吐量。

临时转储到别的地方

如果实在消费不过来,可以先把消息转储到其他地方,比如写到文件或者另一个队列,之后再慢慢处理。

mermaid
graph LR
    A[积压的Topic] --> B[临时消费者]
    B --> C[批量写入文件/数据库]
    C --> D[后续慢慢处理]
    
    A --> E[正常消费者]
    E --> F[处理新消息]

这样能保证新来的消息正常处理,积压的消息走另一条路慢慢消化。

从根上解决问题

紧急处理完了,还得从根本上优化,不然下次还会出问题。

优化消费端处理逻辑

异步化处理

不是所有逻辑都得同步执行。比如发通知、记日志这些操作,完全可以异步处理:

java
@KafkaListener(topics = "order-topic")
public void consume(String message) {
    Order order = parseOrder(message);
    
    // 核心逻辑同步执行
    orderService.saveOrder(order);
    
    // 非核心逻辑异步执行
    CompletableFuture.runAsync(() -> {
        notificationService.sendOrderNotification(order);
        logService.recordOrderLog(order);
    });
}

多线程消费

单线程消费改成多线程:

java
@Configuration
public class KafkaConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(10); // 10个线程并发消费
        factory.setBatchListener(true); // 开启批量消费
        return factory;
    }
}

减少不必要的IO操作

  • 能用缓存的用缓存,别每次都查数据库
  • 批量操作代替单条操作
  • 连接池复用,别每次都新建连接

合理设置分区数

分区数的设置要综合考虑:

  • 预期的吞吐量:单分区的写入吞吐量大概在几十MB/s,消费吞吐量取决于消费逻辑
  • 消费者数量:分区数最好是消费者数量的整数倍
  • 不要太多也不要太少:太少并行度不够,太多会增加Broker负担

一个经验公式:分区数 = max(期望吞吐量 / 单分区吞吐量, 消费者实例数)

做好限流和背压

在生产端做限流,防止流量暴增打垮消费端:

java
// 使用RateLimiter控制发送速率
RateLimiter rateLimiter = RateLimiter.create(10000); // 每秒最多发1万条

public void sendMessage(String message) {
    rateLimiter.acquire();
    kafkaTemplate.send("your-topic", message);
}

在消费端实现背压机制,消费不过来的时候主动降速:

java
// 消费端根据处理能力动态调整拉取速度
props.put("max.poll.records", "100"); // 每次少拉点
props.put("max.poll.interval.ms", "300000"); // 处理时间放长一点

完善监控告警

监控是发现问题的第一道防线,这几个指标必须监控:

指标含义告警阈值建议
Consumer Lag消费延迟(积压消息数)根据业务容忍度设置,比如超过10000就告警
Lag持续时间Lag持续高位的时间超过5分钟告警
消费速率每秒消费的消息数低于历史均值50%告警
消费者数量活跃的消费者实例数少于预期数量告警
mermaid
graph TD
    A[Kafka监控] --> B[Consumer Lag监控]
    A --> C[消费速率监控]
    A --> D[Broker健康监控]
    
    B --> E{Lag > 阈值?}
    E -->|是| F[触发告警]
    E -->|否| G[正常]
    
    F --> H[通知oncall]
    H --> I[排查处理]

面试怎么回答这个问题

面试的时候,别一上来就说解决方案,得有个分析问题的过程。可以这样组织回答:

第一步:先定位问题

"首先我会看Consumer Lag指标,确认积压的严重程度。然后对比生产速率和消费速率,判断是生产端流量暴增还是消费端处理变慢了。"

第二步:紧急处理

"如果是紧急情况,我会先采取临时措施快速消化积压,比如扩容消费者、跳过不重要的消息、或者把积压的消息转储到其他地方后续处理。"

第三步:根本解决

"紧急处理完之后,要从根本上优化。消费端可以用多线程消费、异步处理、批量操作来提升吞吐量。生产端要做好限流防止流量暴增。同时完善监控告警,提前发现问题。"

第四步:总结经验

"处理完之后要做复盘,分析这次积压的根本原因,评估现有架构能支撑的流量上限,必要的话提前扩容或者优化架构。"

这样回答既展示了你的问题分析能力,也展示了你的实战经验和系统思维。

小结

消息积压这个问题,说简单也简单,说复杂也复杂。核心就是生产和消费的速度平衡

解决问题的思路:

  1. 先定位是哪一环出了问题
  2. 紧急情况先止血
  3. 然后从根本上优化
  4. 最后做好监控预防

记住一点:事后处理永远不如事前预防。把监控告警做好,在问题刚出现苗头的时候就介入处理,比积压成山再来救火要轻松得多。

更新: 2026-01-05 10:56:32
原文: https://www.yuque.com/u22210564/zoxfmt/fqlcuud558tlhl2u

Java 后端面试知识库