RocketMQ高级特性详解
RocketMQ高级特性详解
前置知识
在学习本教程之前,建议您先阅读 RocketMQ基础入门 了解基本概念和使用方法。
顺序消息
在某些业务场景中,消息的消费顺序非常重要,例如订单的创建、支付、发货等状态变更消息必须按照特定顺序处理。RocketMQ 提供了顺序消息的特性来满足这一需求。
顺序消息类型
RocketMQ 支持两种顺序:
- 全局顺序:Topic 下的所有消息都按照严格的先入先出(FIFO)顺序进行发布和消费
- 分区顺序:Topic 下的消息根据 Sharding Key 进行分区,同一分区内的消息按照 FIFO 顺序进行发布和消费
顺序消息实现原理
顺序消息的实现原理如下:
- 生产者:将具有相同 Sharding Key 的消息发送到同一个 Message Queue
- Broker:保证同一个 Message Queue 中的消息按照 FIFO 顺序存储
- 消费者:从同一个 Message Queue 中按照 FIFO 顺序消费消息,并且同一时刻只允许一个线程消费该 Message Queue
顺序消息示例
1. 顺序消息生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("OrderedProducerGroup");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 模拟订单消息
String[] orderStatus = {"创建", "支付", "发货", "完成"};
// 模拟5个订单,每个订单有4个状态变更消息
for (int orderId = 1; orderId <= 5; orderId++) {
for (int i = 0; i < orderStatus.length; i++) {
// 创建消息
String content = "订单" + orderId + ":" + orderStatus[i];
Message message = new Message(
"OrderedTopic", // Topic
"OrderedTag", // Tag
content.getBytes() // 消息体
);
// 发送顺序消息
// 使用 orderId 作为 Sharding Key,保证同一订单的消息进入同一队列
SendResult sendResult = producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
},
orderId // 作为 select 方法的参数传入
);
System.out.printf("发送消息: %s, 队列ID: %d%n",
content, sendResult.getMessageQueue().getQueueId());
// 为了演示效果,增加一些延迟
Thread.sleep(100);
}
}
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
2. 顺序消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderedConsumerGroup");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 设置从哪里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅 Topic
consumer.subscribe("OrderedTopic", "*");
// 注册顺序消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.printf("线程: %s, 队列ID: %d, 消息: %s%n",
Thread.currentThread().getName(),
msg.getQueueId(),
new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("顺序消费者已启动,等待消息...");
}
}
延时消息
延时消息是指消息发送到 Broker 后,不会立即被消费者消费,而是在指定的时间后才能被消费。RocketMQ 支持定时消息,可以指定消息在未来某个时间点投递。
延时消息级别
RocketMQ 目前支持以下几个延时级别:
延时级别 | 时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
注意
目前 RocketMQ 不支持任意时间精度的延时,只支持上述固定的延时级别。如果需要更灵活的延时,可以通过应用层面的定时任务实现。
延时消息示例
1. 延时消息生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class DelayedProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("DelayedProducerGroup");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息
Message message = new Message(
"DelayedTopic", // Topic
"DelayedTag", // Tag
("这是一条延时消息,延时10秒").getBytes() // 消息体
);
// 设置延时级别为3,对应延时10秒
message.setDelayTimeLevel(3);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.printf("消息发送时间: %s%n", System.currentTimeMillis());
System.out.printf("消息ID: %s, 发送状态: %s%n",
sendResult.getMsgId(), sendResult.getSendStatus());
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
2. 延时消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DelayedConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayedConsumerGroup");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅 Topic
consumer.subscribe("DelayedTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消息接收时间: %s%n", System.currentTimeMillis());
System.out.printf("消息内容: %s%n", new String(msg.getBody()));
System.out.printf("消息存储时间: %s%n", msg.getStoreTimestamp());
System.out.printf("消息延迟时间: %s 毫秒%n", System.currentTimeMillis() - msg.getStoreTimestamp());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("延时消费者已启动,等待消息...");
}
}
事务消息
事务消息是 RocketMQ 提供的一种高级特性,用于解决分布式事务问题。它可以确保本地事务和发送消息这两个操作要么都成功,要么都失败,保证数据的一致性。
事务消息流程
- 发送方发送半事务消息(Half Message)到 Broker
- Broker 接收到半事务消息后,将消息标记为"待确认"状态
- 发送方执行本地事务
- 根据本地事务执行结果,发送方向 Broker 发送确认消息(Commit 或 Rollback)
- 如果 Broker 长时间没有收到确认消息,会向发送方发起事务状态回查
事务消息示例
事务消息生产者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
// 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
// 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(transactionListener);
// 设置线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
producer.setExecutorService(executorService);
// 启动生产者
producer.start();
try {
for (int i = 0; i < 10; i++) {
// 创建消息
Message message = new Message(
"TransactionTopic", // Topic
"TransactionTag", // Tag
("事务消息 " + i).getBytes() // 消息体
);
// 发送事务消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.printf("发送事务消息 %d, 状态: %s%n", i, sendResult.getLocalTransactionState());
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 等待一段时间再关闭生产者
Thread.sleep(5000);
producer.shutdown();
executorService.shutdown();
}
}
// 事务监听器实现
static class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 模拟执行本地事务
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
System.out.printf("执行本地事务,消息: %s, 状态: %d%n", new String(msg.getBody()), status);
switch (status) {
case 0:
// 本地事务成功,提交事务消息
return LocalTransactionState.COMMIT_MESSAGE;
case 1:
// 本地事务未知,等待回查
return LocalTransactionState.UNKNOW;
case 2:
// 本地事务失败,回滚事务消息
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
Integer status = localTrans.get(msg.getTransactionId());
System.out.printf("回查本地事务,消息: %s, 状态: %d%n", new String(msg.getBody()), status);
if (status != null) {
switch (status) {
case 0:
return LocalTransactionState.COMMIT_MESSAGE;
case 1:
// 模拟回查时提交
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
事务消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionConsumerGroup");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅 Topic
consumer.subscribe("TransactionTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消费事务消息: %s%n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("事务消息消费者已启动,等待消息...");
}
}
批量消息
批量消息是指将多条消息合并成一批进行发送,可以提高传输效率,减少网络开销。
批量消息限制
- 批量消息的总大小不能超过 4MB
- 批量消息中的所有消息必须具有相同的 Topic
- 批量消息中的所有消息必须具有相同的消息等待时间
- 批量消息中的所有消息不能是延时消息和事务消息
批量消息示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息列表
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// 创建消息
Message message = new Message(
"BatchTopic", // Topic
"BatchTag", // Tag
("批量消息 " + i).getBytes() // 消息体
);
messages.add(message);
}
// 批量发送消息
SendResult sendResult = producer.send(messages);
System.out.printf("批量消息发送状态: %s%n", sendResult.getSendStatus());
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
消息过滤
RocketMQ 支持两种消息过滤方式:Tag 过滤和 SQL 过滤。
1. Tag 过滤
Tag 过滤是最常用的过滤方式,消费者可以订阅指定的 Tag 来过滤消息。
// 生产者设置 Tag
Message message = new Message("FilterTopic", "TagA", "Hello RocketMQ".getBytes());
// 消费者订阅指定 Tag
consumer.subscribe("FilterTopic", "TagA || TagB"); // 订阅 TagA 或 TagB
2. SQL 过滤
SQL 过滤是一种更灵活的过滤方式,消费者可以使用 SQL 表达式来过滤消息。
注意
使用 SQL 过滤需要在 Broker 配置文件中开启 SQL 过滤功能:enablePropertyFilter=true
SQL 过滤示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducerGroup");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息并设置属性
Message message = new Message(
"SqlFilterTopic", // Topic
"SqlFilterTag", // Tag
"这是一条带属性的消息".getBytes() // 消息体
);
// 设置消息属性
message.putUserProperty("age", "25");
message.putUserProperty("region", "北京");
// 发送消息
SendResult sendResult = producer.send(message);
System.out.printf("消息发送状态: %s%n", sendResult.getSendStatus());
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumerGroup");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 使用 SQL 过滤表达式订阅 Topic
consumer.subscribe(
"SqlFilterTopic",
MessageSelector.bySql("age >= 18 AND region = '北京'")
);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消费消息: %s, 属性: age=%s, region=%s%n",
new String(msg.getBody()),
msg.getUserProperty("age"),
msg.getUserProperty("region"));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("SQL 过滤消费者已启动,等待消息...");
}
}
消息轨迹
RocketMQ 4.5.0 版本开始支持消息轨迹功能,可以记录消息从生产到消费的完整链路,方便问题排查和系统监控。
开启消息轨迹
1. 生产者开启消息轨迹
// 创建带轨迹的生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup", true);
2. 消费者开启消息轨迹
// 创建带轨迹的消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup", true);
消息重试
RocketMQ 支持消息重试机制,当消费者消费失败时,可以将消息重新投递给消费者进行重试。
消息重试策略
1. 顺序消息的重试
对于顺序消息,消费失败后,RocketMQ 会无限次重试,直到消费成功或人工干预。
// 顺序消息消费失败后,返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 表示稍后重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
2. 普通消息的重试
对于普通消息,消费失败后,RocketMQ 会按照以下策略进行重试:
- 重试次数:16 次
- 重试时间间隔:10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
// 普通消息消费失败后,返回 RECONSUME_LATER 表示稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
3. 自定义重试次数
// 设置最大重试次数
consumer.setMaxReconsumeTimes(5);
消息幂等性
RocketMQ 无法保证消息不重复,需要业务层面保证消息幂等性。以下是几种常见的幂等性实现方式:
1. 使用唯一标识
// 消费者端实现幂等消费
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 使用 Redis 检查消息是否已处理
if (redisTemplate.hasKey("processed:" + msgId)) {
// 消息已处理,跳过
continue;
}
try {
// 处理消息
// ...
// 标记消息已处理,设置过期时间
redisTemplate.opsForValue().set("processed:" + msgId, "1", 7, TimeUnit.DAYS);
} catch (Exception e) {
// 处理异常
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
2. 使用数据库唯一索引
// 使用数据库唯一索引实现幂等性
public void processOrder(Order order) {
try {
// 插入订单,依赖唯一索引约束
orderMapper.insert(order);
} catch (DuplicateKeyException e) {
// 订单已存在,忽略
log.info("订单 {} 已存在,忽略", order.getOrderId());
}
}
最佳实践
1. 生产者最佳实践
- 设置重试次数:对于重要的消息,设置合理的重试次数
- 批量发送:合理使用批量发送提高吞吐量
- 异步发送:对于对延迟不敏感的场景,使用异步发送提高性能
- 消息压缩:对于大消息,考虑使用压缩算法减小消息体积
// 设置生产者重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
// 设置发送超时时间
producer.setSendMsgTimeout(3000);
// 压缩消息
Message message = new Message("CompressTopic", "CompressTag", "大消息内容".getBytes());
message.setCompressed(true);
2. 消费者最佳实践
- 合理设置消费线程数:根据业务处理能力设置合适的消费线程数
- 批量消费:合理使用批量消费提高吞吐量
- 消费者负载均衡:合理设置消费者实例数量,实现负载均衡
- 消费者重试:对于可重试的异常,返回 RECONSUME_LATER 状态
// 设置消费线程数
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
// 设置批量消费最大消息数
consumer.setConsumeMessageBatchMaxSize(10);
3. 集群部署最佳实践
- 多主多从:部署多个 Master 和 Slave 实现高可用
- 异步刷盘:对于性能要求高的场景,使用异步刷盘
- 同步双写:对于可靠性要求高的场景,使用同步双写
- 合理设置内存:根据消息量设置合适的内存大小
常见问题
1. 如何解决消息堆积问题?
消息堆积是指消息生产速度大于消费速度,导致消息在 Broker 中积压。解决方法:
- 增加消费者数量:增加消费者实例或消费线程数
- 提高消费者处理能力:优化消费者代码,提高单条消息处理速度
- 批量消费:使用批量消费模式,减少网络开销
- 使用更多队列:增加 Topic 的队列数,提高并行度
- 临时扩容:紧急情况下,可以部署临时消费者快速消费堆积消息
// 增加消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(50);
// 增加批量消费数量
consumer.setConsumeMessageBatchMaxSize(32);
2. 如何保证消息不丢失?
RocketMQ 通过以下机制保证消息不丢失:
生产者端:
- 使用同步发送或异步发送 + 重试机制
- 使用事务消息确保本地事务和消息发送的一致性
Broker 端:
- 开启同步刷盘(
flushDiskType=SYNC_FLUSH
) - 开启主从同步复制(
brokerRole=SYNC_MASTER
)
- 开启同步刷盘(
消费者端:
- 消费成功后再提交消费位点
- 对于重要消息,确保业务处理成功后再返回 CONSUME_SUCCESS
// 生产者端设置重试次数
producer.setRetryTimesWhenSendFailed(5);
// 消费者端确保处理成功后再提交
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 处理消息
// ...
// 处理成功后返回 CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 处理失败,稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
总结
本文详细介绍了 RocketMQ 的高级特性:
- ✅ 顺序消息:保证消息的消费顺序
- ✅ 延时消息:指定消息在未来某个时间点投递
- ✅ 事务消息:解决分布式事务问题
- ✅ 批量消息:提高传输效率
- ✅ 消息过滤:支持 Tag 过滤和 SQL 过滤
- ✅ 消息轨迹:记录消息从生产到消费的完整链路
- ✅ 消息重试:消费失败后重新投递
- ✅ 消息幂等性:保证消息不重复消费
下一步学习
- 学习 RocketMQ 的集群部署和运维
- 了解 RocketMQ 与 Spring Boot 的集成
- 探索 RocketMQ 在实际项目中的应用场景
希望这篇文章能帮助您深入了解 RocketMQ 的高级特性!如果您有任何问题,欢迎在评论区讨论。