Kafka消费者详解
2025/8/15大约 4 分钟
Apache Kafka消费者详解
前置知识
在学习本文之前,请确保您已经:
- 了解Kafka基本概念
- 掌握Java并发编程
- 熟悉消费者组机制
消费者工作原理
1. 整体架构
Kafka消费者的工作流程:
- 订阅主题:指定要消费的Topic
- 分区分配:消费者组内分配分区
- 拉取消息:从分区拉取消息
- 消息处理:处理消息并提交位移
- 位移管理:管理消费位置
2. 消费者组
消费者组的特点:
- 组内消费者共同消费消息
- 每个分区只能被组内一个消费者消费
- 支持动态增减消费者
- 自动进行再均衡(Rebalance)
核心配置
1. 基础配置
@Data
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// 基础连接配置
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
// 反序列化配置
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(config);
}
}
2. 消费配置
// 消费相关配置
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 位移重置策略
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 批量拉取数量
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 最大拉取间隔
3. 性能配置
// 性能相关配置
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1); // 最小获取字节数
config.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 52428800); // 最大获取字节数
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 最大等待时间
消费模式
1. 自动提交位移
自动提交位移示例代码
@Slf4j
public class AutoCommitConsumer {
public void consume(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Received message: {}", record.value());
// 处理消息
processMessage(record);
}
}
}
}
private void processMessage(ConsumerRecord<String, String> record) {
// 消息处理逻辑
}
}
2. 手动提交位移
手动提交位移示例代码
@Slf4j
public class ManualCommitConsumer {
public void consume(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processMessage(record);
// 处理成功后提交位移
consumer.commitSync(
Collections.singletonMap(
new TopicPartition(record.topic(),
record.partition()),
new OffsetAndMetadata(record.offset() + 1)
)
);
} catch (Exception e) {
log.error("Error processing message", e);
// 处理失败的逻辑
}
}
}
}
}
}
高级特性
1. 消费者拦截器
@Slf4j
public class LoggingConsumerInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(
ConsumerRecords<String, String> records) {
log.info("Consuming {} records", records.count());
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
log.info("Committing offsets: {}", offsets);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
2. 再均衡监听器
@Slf4j
public class RebalanceListener implements ConsumerRebalanceListener {
private final Consumer<String, String> consumer;
private final Map<TopicPartition, OffsetAndMetadata> currentOffsets;
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("Partitions revoked: {}", partitions);
// 提交当前位移
consumer.commitSync(currentOffsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Partitions assigned: {}", partitions);
// 清空当前位移缓存
currentOffsets.clear();
}
}
最佳实践
1. 消费者配置
配置建议
- 合理设置fetch.min.bytes
- 适当调整max.poll.records
- 设置合适的session.timeout.ms
- 配置合理的max.poll.interval.ms
2. 异常处理
常见问题处理
CommitFailedException
- 提交位移超时
- 检查网络连接
- 增加重试机制
WakeupException
- 正常的中断信号
- 优雅关闭消费者
RecordTooLargeException
- 消息太大
- 调整fetch.max.bytes
3. 性能优化
性能提升
- 批量处理消息
- 并行处理消息
- 合理设置缓冲区
- 使用压缩传输
监控指标
1. 消费延迟监控
@Slf4j
public class ConsumerLagMonitor {
private final Consumer<String, String> consumer;
public Map<TopicPartition, Long> getLag() {
Map<TopicPartition, Long> lags = new HashMap<>();
// 获取分配的分区
Set<TopicPartition> partitions = consumer.assignment();
// 获取最新位移
Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(new ArrayList<>(partitions));
// 计算延迟
for (TopicPartition partition : partitions) {
long currentOffset = consumer.position(partition);
long endOffset = endOffsets.get(partition);
lags.put(partition, endOffset - currentOffset);
}
return lags;
}
}
总结
本文详细介绍了Kafka消费者的:
- ✅ 工作原理和架构
- ✅ 核心配置选项
- ✅ 消费模式选择
- ✅ 高级特性应用
- ✅ 最佳实践建议
下一步学习
- 深入了解事务机制
- 掌握消费者组管理
- 学习监控运维
希望这篇文章能帮助您更好地使用Kafka消费者!如果您有任何问题,欢迎在评论区讨论。