RocketMQ基础入门
RocketMQ基础入门
前置知识
在学习本教程之前,建议您具备以下基础知识:
- Java 基础语法
- Maven 或 Gradle 构建工具
- 分布式系统基础概念
- 消息队列基本原理
什么是 RocketMQ?
RocketMQ 是阿里巴巴开源的分布式消息中间件,现已成为 Apache 顶级项目。它是一款低延迟、高可靠、高吞吐、可水平扩展的消息中间件,广泛应用于异步通信、削峰填谷、应用解耦等场景。
主要特性
- 高吞吐量:单机支持十万级的消息吞吐量
- 分布式架构:支持集群部署,可水平扩展
- 低延迟:毫秒级的消息延迟
- 高可用性:支持主从架构,保证消息不丢失
- 消息可追溯:支持消息轨迹查询
- 灵活的消息模型:支持发布/订阅、点对点等多种消息模型
- 丰富的功能:支持延时消息、事务消息、顺序消息等
RocketMQ 核心概念
1. 消息模型
RocketMQ 支持两种消息模型:
- 发布/订阅(Pub/Sub):生产者发送消息到 Topic,多个消费者组可以订阅该 Topic
- 点对点(P2P):生产者发送消息到 Topic,一个消费者组中的一个消费者消费该消息
2. 核心组件

2.1 Producer(生产者)
生产者负责生产消息,将业务系统产生的消息发送到 RocketMQ 服务器。
主要特点:
- 支持同步、异步、单向发送
- 支持批量发送
- 支持事务消息
2.2 Consumer(消费者)
消费者负责消费消息,从 RocketMQ 服务器拉取消息并进行处理。
主要特点:
- 支持 Push 和 Pull 两种消费模式
- 支持集群消费和广播消费
- 支持顺序消费和并发消费
2.3 Name Server(命名服务)
Name Server 是一个无状态的服务发现和路由组件,负责管理 Topic 的路由信息。
主要功能:
- 存储 Broker 的地址信息
- 存储 Topic 的路由信息
- 提供 Broker 的发现和故障转移
2.4 Broker(消息服务器)
Broker 负责存储和转发消息,是 RocketMQ 的核心组件。
主要功能:
- 存储消息
- 转发消息
- 提供高可用性(主从架构)
- 提供消息过滤
3. 其他重要概念
3.1 Topic(主题)
Topic 是消息的逻辑分类,生产者发送消息到指定的 Topic,消费者订阅 Topic 来消费消息。
3.2 Message Queue(消息队列)
Topic 物理上由多个 Message Queue 组成,用于存储消息。Message Queue 可以分布在不同的 Broker 上,提高并发能力。
3.3 Message(消息)
Message 是 RocketMQ 中的最小数据单元,包含消息体、消息属性和消息头。
3.4 Tag(标签)
Tag 是 RocketMQ 提供的消息子类型,用于同一 Topic 下消息的进一步过滤。
3.5 Consumer Group(消费者组)
消费者组由多个消费者实例组成,同一组内的消费者消费逻辑必须一致。
安装与配置
1. 环境要求
- 64 位操作系统
- 64 位 JDK 1.8+
- Maven 3.2.x+
- Git
2. 下载与安装
2.1 二进制包安装
# 下载二进制包
wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
# 解压
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-4.9.4/
# 启动 Name Server
nohup sh bin/mqnamesrv &
# 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
2.2 Docker 安装
# 拉取镜像
docker pull apache/rocketmq:4.9.4
# 创建网络
docker network create rocketmq
# 启动 Name Server
docker run -d --name rmqnamesrv --network rocketmq -p 9876:9876 apache/rocketmq:4.9.4 sh mqnamesrv
# 启动 Broker
docker run -d --name rmqbroker --network rocketmq -p 10909:10909 -p 10911:10911 \
-v /path/to/broker.conf:/opt/rocketmq-4.9.4/conf/broker.conf \
apache/rocketmq:4.9.4 sh mqbroker -n rmqnamesrv:9876 -c /opt/rocketmq-4.9.4/conf/broker.conf
3. 基本配置
3.1 Broker 配置文件 (broker.conf)
# 集群名称
brokerClusterName=DefaultCluster
# Broker 名称
brokerName=broker-a
# Broker ID,0 表示 Master,大于 0 表示 Slave
brokerId=0
# 删除过期文件的时间点,默认凌晨 4 点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# Broker 角色,ASYNC_MASTER 表示异步复制 Master
brokerRole=ASYNC_MASTER
# 刷盘策略,ASYNC_FLUSH 表示异步刷盘
flushDiskType=ASYNC_FLUSH
# Name Server 地址
nameServer=localhost:9876
快速入门示例
1. 添加依赖
在您的 pom.xml
中添加 RocketMQ 客户端依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
2. 生产者示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
// 设置 Name Server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
try {
// 创建消息
Message message = new Message(
"TopicTest", // Topic
"TagA", // Tag
"Hello RocketMQ".getBytes() // 消息体
);
// 发送消息
SendResult sendResult = producer.send(message);
// 打印发送结果
System.out.printf("消息发送状态: %s%n", sendResult);
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
3. 消费者示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
// 设置 Name Server 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅 Topic
consumer.subscribe("TopicTest", "*"); // * 表示订阅所有 Tag
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息: %s%n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动,等待消息...");
}
}
消息发送方式
RocketMQ 支持三种消息发送方式:同步发送、异步发送和单向发送。
1. 同步发送
同步发送是指生产者发送消息后,等待 Broker 的响应结果。
// 同步发送示例
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(message);
System.out.printf("消息ID: %s, 发送状态: %s%n", sendResult.getMsgId(), sendResult.getSendStatus());
2. 异步发送
异步发送是指生产者发送消息后,不等待 Broker 的响应结果,而是通过回调函数处理响应。
// 异步发送示例
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("消息ID: %s, 发送状态: %s%n", sendResult.getMsgId(), sendResult.getSendStatus());
}
@Override
public void onException(Throwable e) {
System.err.printf("消息发送失败: %s%n", e.getMessage());
}
});
3. 单向发送
单向发送是指生产者发送消息后,不关心 Broker 的响应结果,适用于对可靠性要求不高的场景。
// 单向发送示例
Message message = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
producer.sendOneway(message);
消息消费模式
RocketMQ 支持两种消费模式:集群消费和广播消费。
1. 集群消费
集群消费是指同一个消费者组中的消费者平均分摊消息,每条消息只会被组内的一个消费者消费。
// 集群消费示例(默认模式)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setMessageModel(MessageModel.CLUSTERING); // 设置为集群模式(默认)
2. 广播消费
广播消费是指同一个消费者组中的每个消费者都会收到所有消息,每条消息会被组内的所有消费者消费。
// 广播消费示例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setMessageModel(MessageModel.BROADCASTING); // 设置为广播模式
常见问题
1. 如何保证消息不丢失?
RocketMQ 通过以下机制保证消息不丢失:
- 生产者端:使用同步发送或异步发送 + 重试机制
- Broker 端:开启同步刷盘或主从同步复制
- 消费者端:消费成功后再提交消费位点
// 生产者端设置重试次数
producer.setRetryTimesWhenSendFailed(3);
// Broker 端配置同步刷盘
// flushDiskType=SYNC_FLUSH
// 消费者端手动提交消费位点
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 处理消息
// ...
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消费失败,稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
2. 如何处理消息重复消费?
RocketMQ 无法保证消息不重复,需要业务层面保证消息幂等性:
- 使用唯一标识(如消息ID)进行去重
- 使用数据库唯一索引
- 使用分布式锁
// 消费者端实现幂等消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 检查消息是否已处理(使用 Redis 或数据库)
if (isProcessed(msgId)) {
continue; // 已处理,跳过
}
try {
// 处理消息
// ...
// 标记消息已处理
markAsProcessed(msgId);
} catch (Exception e) {
// 处理异常
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
总结
本文详细介绍了 RocketMQ 的基本概念、架构和核心特性:
- ✅ 核心概念:Producer、Consumer、Name Server、Broker、Topic、Message Queue 等
- ✅ 安装配置:二进制包安装和 Docker 安装
- ✅ 快速入门:生产者和消费者示例
- ✅ 消息发送方式:同步发送、异步发送、单向发送
- ✅ 消息消费模式:集群消费和广播消费
下一步学习
- 学习 RocketMQ 的高级特性(顺序消息、延时消息、事务消息等)
- 了解 RocketMQ 的集群部署和运维
- 探索 RocketMQ 在实际项目中的应用场景
希望这篇文章能帮助您快速入门 RocketMQ!如果您有任何问题,欢迎在评论区讨论。