Kafka生产者详解
2025/8/15大约 3 分钟
Apache Kafka生产者详解
前置知识
在学习本文之前,请确保您已经:
- 了解Kafka基本概念
- 掌握Java并发编程
- 熟悉Maven项目管理
生产者工作原理
1. 整体架构
Kafka生产者的消息发送流程:
- 消息封装:将消息封装为ProducerRecord
- 序列化:对key和value进行序列化
- 分区分配:选择目标分区
- 批次管理:将消息添加到批次
- 网络发送:发送到Kafka集群
2. 分区策略
消息分区的方式:
- 轮询:默认策略,均匀分布
- 随机:随机选择分区
- 按键分区:同key的消息发送到同一分区
- 自定义:实现自定义分区策略
核心配置
1. 基础配置
@Data
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
// 基础连接配置
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
// 序列化配置
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return new DefaultKafkaProducerFactory<>(config);
}
}
2. 可靠性配置
// 可靠性相关配置
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 确认机制
config.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); // 重试间隔
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
3. 性能配置
// 性能相关配置
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小
config.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 等待时间
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩算法
发送消息模式
1. 发后即忘
@Slf4j
public class FireAndForgetProducer {
public void send(String topic, String message) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, message);
producer.send(record);
}
}
2. 同步发送
@Slf4j
public class SyncProducer {
public void sendSync(String topic, String message) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, message);
try {
RecordMetadata metadata = producer.send(record).get();
log.info("Message sent to partition {} offset {}",
metadata.partition(), metadata.offset());
} catch (Exception e) {
log.error("Error sending message", e);
}
}
}
3. 异步发送
@Slf4j
public class AsyncProducer {
public void sendAsync(String topic, String message) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, message);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("Message sent to partition {} offset {}",
metadata.partition(), metadata.offset());
} else {
log.error("Error sending message", exception);
}
});
}
}
高级特性
1. 自定义分区器
@Slf4j
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 自定义分区逻辑
if (keyBytes == null) {
return (int) (System.currentTimeMillis() % numPartitions);
}
return Math.abs(Arrays.hashCode(keyBytes) % numPartitions);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
2. 自定义拦截器
@Slf4j
public class LoggingProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
log.info("Sending message: {}", record.value());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("Failed to send message", exception);
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
最佳实践
1. 性能优化
批量处理
- 合理设置batch.size
- 适当调整linger.ms
- 启用压缩
- 调整buffer.memory
2. 可靠性保证
消息可靠性
- 设置acks=all
- 配置min.insync.replicas
- 启用幂等性
- 实现重试机制
3. 异常处理
常见异常
LeaderNotAvailableException
- 临时异常,通常自动恢复
- 配置重试机制
NotEnoughReplicasException
- 副本同步问题
- 检查集群健康状态
TimeoutException
- 网络或服务器问题
- 调整超时时间
监控指标
1. JMX监控
重要的生产者指标:
- request-rate:请求速率
- response-rate:响应速率
- request-latency-avg:请求延迟
- outgoing-byte-rate:发送字节率
- record-error-rate:错误率
2. 应用监控
@Slf4j
public class ProducerMonitor {
private final Map<String, AtomicLong> metrics = new ConcurrentHashMap<>();
public void recordSuccess(String topic) {
metrics.computeIfAbsent(topic + ".success",
k -> new AtomicLong()).incrementAndGet();
}
public void recordFailure(String topic) {
metrics.computeIfAbsent(topic + ".failure",
k -> new AtomicLong()).incrementAndGet();
}
public void recordLatency(String topic, long latency) {
// 记录延迟统计
}
}
总结
本文详细介绍了Kafka生产者的:
- ✅ 工作原理和架构
- ✅ 核心配置选项
- ✅ 消息发送模式
- ✅ 高级特性应用
- ✅ 最佳实践建议
下一步学习
- 深入了解消费者特性
- 掌握事务机制
- 学习监控运维
希望这篇文章能帮助您更好地使用Kafka生产者!如果您有任何问题,欢迎在评论区讨论。