Kafka入门教程
2025/8/15大约 4 分钟
Apache Kafka入门教程
前置知识
在开始本教程之前,建议您具备以下基础知识:
- Java基础知识
- Maven项目管理
- 分布式系统基础
- 消息队列概念
什么是Kafka?
Apache Kafka是一个分布式流处理平台,具有以下特点:
- 高吞吐量:支持每秒数百万条消息的处理
- 可扩展性:支持集群动态扩展
- 持久化:消息数据持久化到磁盘
- 高可用性:支持数据复制和故障转移
核心概念
Topic(主题)
- 消息的逻辑分类
- 可以有多个分区(Partition)
- 每个分区都是有序的消息队列
Partition(分区)
- Topic物理上的分组
- 提供并行处理能力
- 每个分区都有一个leader和多个follower
Producer(生产者)
- 发送消息到Topic
- 可以指定分区策略
Consumer(消费者)
- 从Topic读取消息
- 可以组成消费者组
Broker(代理)
- Kafka服务器实例
- 负责消息的存储和转发
环境搭建
1. 下载安装
访问Kafka官网下载最新版本:
# 解压下载的文件
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
# 启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动Kafka
bin/kafka-server-start.sh config/server.properties
2. Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
基本操作
1. 创建Topic
@Slf4j
public class TopicCreator {
public static void createTopic(String topicName, int partitions, short replicationFactor) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (Admin admin = Admin.create(props)) {
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
admin.createTopics(Collections.singleton(newTopic)).all().get();
log.info("Topic {} created successfully", topicName);
} catch (Exception e) {
log.error("Failed to create topic: {}", e.getMessage());
}
}
}
2. 简单消息发送
@Slf4j
public class SimpleProducer {
public static void sendMessage(String topic, String message) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, message);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("Message sent successfully to topic {} partition {}",
metadata.topic(), metadata.partition());
} else {
log.error("Failed to send message: {}", exception.getMessage());
}
});
}
}
}
3. 简单消息消费
@Slf4j
public class SimpleConsumer {
public static void consumeMessages(String topic, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
log.info("Received message: {}", record.value());
}
}
}
}
}
关键特性
1. 消息持久化
Kafka将所有消息持久化到磁盘,提供了数据持久性保证:
- 消息写入磁盘后才确认
- 支持多副本备份
- 可配置数据保留策略
2. 分区机制
分区提供了并行处理能力:
- 每个主题可以有多个分区
- 分区数据分布在不同broker上
- 支持自定义分区策略
3. 消费者组
消费者组提供了消息消费的扩展性:
- 组内消费者共同消费消息
- 自动负载均衡
- 故障时自动重平衡
最佳实践
生产者最佳实践
- 合理配置批量发送
- 使用合适的确认机制
- 实现重试策略
- 监控发送性能
消费者最佳实践
- 正确设置消费者组
- 合理配置提交策略
- 优雅处理重平衡
- 实现幂等性消费
常见问题
1. 如何保证消息不丢失?
生产者端:
- 使用同步发送
- 设置acks=all
- 实现重试机制
Broker端:
- 配置副本数
- 等待ISR确认
消费者端:
- 手动提交位移
- 处理完成后再提交
2. 如何处理重复消息?
生产者端:
- 使用幂等性机制
- 设置事务ID
消费者端:
- 实现幂等性消费
- 记录消息ID
- 使用去重表
总结
本文介绍了Kafka的:
- ✅ 核心概念和特性
- ✅ 基本环境搭建
- ✅ 简单消息收发
- ✅ 关键特性说明
- ✅ 最佳实践建议
下一步学习
- 深入了解生产者特性
- 掌握消费者配置
- 学习高级特性应用
希望这篇文章能帮助您快速入门Kafka!如果您有任何问题,欢迎在评论区讨论。