RocketMQ源码分析与性能优化
RocketMQ源码分析与性能优化
源码结构概览
RocketMQ 的源码结构清晰,模块划分合理,便于理解和维护。下面我们来了解 RocketMQ 的主要模块及其功能。
核心模块
模块名称 | 功能描述 |
---|---|
rocketmq-client | 客户端API,包括生产者和消费者API |
rocketmq-broker | 消息中转角色,负责存储消息,转发消息 |
rocketmq-namesrv | 命名服务,路由管理 |
rocketmq-store | 存储实现,包括commitlog、consumequeue等 |
rocketmq-common | 公共代码,各个模块都会依赖 |
rocketmq-remoting | 远程通信模块,基于Netty |
rocketmq-tools | 命令行工具 |
rocketmq-filter | 消息过滤器 |
rocketmq-acl | 权限控制模块 |
rocketmq-logging | 日志模块 |
rocketmq-srvutil | 服务工具类 |
源码目录结构
rocketmq/
├── broker # Broker实现
├── client # 客户端实现
├── common # 公共代码
├── dev # 开发者信息
├── distribution # 部署分发相关
├── example # 示例代码
├── filter # 消息过滤器
├── logging # 日志模块
├── namesrv # NameServer实现
├── openmessaging # 开放消息标准
├── remoting # 远程通信模块
├── srvutil # 服务工具类
├── store # 存储实现
├── style # 代码风格
├── test # 测试代码
└── tools # 工具类
核心组件源码分析
1. NameServer 源码分析
NameServer 是 RocketMQ 的路由注册中心,负责管理 Broker 的路由信息。下面我们来分析 NameServer 的核心源码。
1.1 NameServer 启动流程
NameServer 的启动入口在 org.apache.rocketmq.namesrv.NamesrvStartup
类中:
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 创建并启动 NameServer 控制器
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
createNamesrvController
方法负责创建 NameServer 控制器:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 解析命令行参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 创建 NameServer 配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876); // 默认端口 9876
// 解析配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
in.close();
}
}
// 创建 NameServer 控制器
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 注册钩子函数,优雅关闭
controller.registerShutdownHook();
return controller;
}
start
方法负责启动 NameServer 控制器:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null != controller) {
// 初始化控制器
controller.initialize();
// 启动控制器
controller.start();
}
return controller;
}
1.2 NameServer 路由管理
NameServer 的核心功能是路由管理,实现在 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
类中:
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
// 注册 Broker
public RegisterBrokerResult registerBroker(...) {
// ...
}
// 注销 Broker
public void unregisterBroker(...) {
// ...
}
// 根据 Topic 查询路由信息
public TopicRouteData pickupTopicRouteData(final String topic) {
// ...
}
// 扫描不活跃的 Broker
public void scanNotActiveBroker() {
// ...
}
}
NameServer 通过以下几个重要的数据结构来管理路由信息:
topicQueueTable
:Topic 队列路由信息,消息发送时根据路由表进行负载均衡brokerAddrTable
:Broker 物理地址信息,包含 brokerName 到 Broker 地址的映射clusterAddrTable
:集群信息,包含集群名称到 Broker 名称的映射brokerLiveTable
:Broker 状态信息,包含 Broker 地址到 Broker 状态的映射filterServerTable
:消息过滤服务器地址信息
2. Broker 源码分析
Broker 是 RocketMQ 的核心组件,负责消息的存储和转发。下面我们来分析 Broker 的核心源码。
2.1 Broker 启动流程
Broker 的启动入口在 org.apache.rocketmq.broker.BrokerStartup
类中:
public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController createBrokerController(String[] args) {
// 解析命令行参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 创建 Broker 配置
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
// 解析配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
in.close();
}
}
// 创建 Broker 控制器
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// 注册钩子函数,优雅关闭
controller.registerShutdownHook();
return controller;
}
public static BrokerController start(final BrokerController controller) {
try {
// 初始化控制器
controller.initialize();
// 启动控制器
controller.start();
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return controller;
}
2.2 Broker 消息处理流程
Broker 处理消息的核心流程在 org.apache.rocketmq.broker.processor.SendMessageProcessor
类中:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
// 处理请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
SendMessageContext sendMessageContext = null;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
// 处理消息发送请求
return this.sendMessage(ctx, request, sendMessageContext, requestHeader);
}
}
// 发送消息
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request,
final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) {
// 创建响应
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
// 检查消息是否合法
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
// 处理消息
final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimestamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
return response;
}
// 检查 Topic 是否合法
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 自动创建 Topic
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(),
topicSysFlag);
}
// 检查写权限
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
|| !PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
// 检查队列是否合法
int queueIdInt = requestHeader.getQueueId();
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
if (queueIdInt >= topicConfig.getWriteQueueNums()) {
String errorInfo = String.format("queueId[%d] is illegal, topicConfig.writeQueueNums=%d", queueIdInt, topicConfig.getWriteQueueNums());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
// 构建消息内部属性
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
// ...
// 存储消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
// 处理存储结果
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
// 存储成功
break;
case FLUSH_DISK_TIMEOUT:
// 刷盘超时
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
response.setRemark("FLUSH_DISK_TIMEOUT");
break;
// ...
}
// 设置响应
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
} else {
// 存储失败
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
return response;
}
}
}
3. 存储模块源码分析
RocketMQ 的存储模块是其核心组件之一,负责消息的持久化存储。下面我们来分析存储模块的核心源码。
3.1 存储架构
RocketMQ 的存储架构如下图所示:
+-------------------+
| CommitLog | 消息主体以及元数据的存储主体
+-------------------+
| ConsumeQueue | 消息消费队列,引用了 CommitLog 中的消息
+-------------------+
| IndexFile | 消息索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法
+-------------------+
3.2 CommitLog 源码分析
CommitLog 是 RocketMQ 存储的核心,所有的消息都存储在 CommitLog 中。下面我们来分析 CommitLog 的核心源码。
org.apache.rocketmq.store.CommitLog
类负责消息的存储:
public class CommitLog {
// 消息存储
private final DefaultMessageStore defaultMessageStore;
// 映射文件队列
private final MappedFileQueue mappedFileQueue;
// 追加消息回调
private final AppendMessageCallback appendMessageCallback;
// 提交日志调度服务
private final CommitLogService commitLogService;
// 构造函数
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.defaultMessageStore = defaultMessageStore;
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
this.commitLogService = new CommitLogService();
}
// 存储消息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 消息检查
if (msg.getTopic().length() > Byte.MAX_VALUE) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// 获取存储锁
PutMessageLock putMessageLock = this.defaultMessageStore.getPutMessageLock();
putMessageLock.lock();
try {
// 获取最后一个映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (null == mappedFile || mappedFile.isFull()) {
// 创建新的映射文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
if (null == mappedFile) {
// 创建映射文件失败
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 追加消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
// 追加成功
break;
case END_OF_FILE:
// 文件已满,创建新文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
// 消息大小超过限制
return new PutMessageResult(result.getStatus(), result);
default:
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
// 处理刷盘
handleDiskFlush(result, putMessageResult, msg);
// 处理主从复制
handleHA(result, putMessageResult, msg);
return putMessageResult;
} finally {
putMessageLock.unlock();
}
}
// 处理刷盘
private void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.commitLogService;
if (messageExt.isWaitStoreMsgOK()) {
// 同步等待刷盘完成
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
// 异步刷盘
service.wakeup();
}
}
// 异步刷盘
else {
this.flushCommitLogService.wakeup();
}
}
// 处理主从复制
private void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步复制
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// 同步等待复制完成
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave 不可用
else {
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
}
3.3 ConsumeQueue 源码分析
ConsumeQueue 是消息的逻辑队列,类似于索引文件,存储了指向物理存储的地址。下面我们来分析 ConsumeQueue 的核心源码。
org.apache.rocketmq.store.ConsumeQueue
类负责消息队列的管理:
public class ConsumeQueue {
// 每个条目的大小:8 字节物理偏移量 + 4 字节消息大小 + 8 字节 Tag 哈希码
public static final int CQ_STORE_UNIT_SIZE = 20;
// 消息存储
private final DefaultMessageStore defaultMessageStore;
// 主题
private final String topic;
// 队列 ID
private final int queueId;
// 映射文件队列
private final MappedFileQueue mappedFileQueue;
// 存储路径
private final String storePath;
// 扩展文件路径
private final String extStorePath;
// 最后一个消费队列的物理偏移量
private long maxPhysicOffset = -1;
// 最小逻辑偏移量
private volatile long minLogicOffset = 0;
// 构造函数
public ConsumeQueue(final String topic, final int queueId, final String storePath, final String extStorePath,
final int mappedFileSize, final DefaultMessageStore defaultMessageStore) {
this.topic = topic;
this.queueId = queueId;
this.storePath = storePath;
this.extStorePath = extStorePath;
this.defaultMessageStore = defaultMessageStore;
this.mappedFileQueue = new MappedFileQueue(storePath + File.separator + topic + File.separator + queueId,
mappedFileSize, null);
}
// 加载
public boolean load() {
boolean result = this.mappedFileQueue.load();
return result;
}
// 恢复
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// 从倒数第三个文件开始恢复
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
// 读取消息
for (int i = 0; i < mappedFile.getFileSize(); i += CQ_STORE_UNIT_SIZE) {
byteBuffer.position(i);
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
// 有效性检查
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset + size;
} else {
break;
}
}
// 处理下一个文件
if (mappedFileOffset == mappedFile.getFileSize() && index < mappedFiles.size() - 1) {
index++;
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
continue;
}
// 恢复完成
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
break;
}
}
}
// 添加消息索引
public boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) {
// 构建消息索引
final int totalSize = CQ_STORE_UNIT_SIZE;
// 获取或创建映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(cqOffset * CQ_STORE_UNIT_SIZE);
if (mappedFile == null) {
return false;
}
// 写入消息索引
byteBuffer.putLong(offset);
byteBuffer.putInt(size);
byteBuffer.putLong(tagsCode);
// 更新最大物理偏移量
this.maxPhysicOffset = offset + size;
return true;
}
// 获取消息索引
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
// 计算物理偏移量
long offset = startIndex * CQ_STORE_UNIT_SIZE;
// 获取映射文件
return this.mappedFileQueue.getIndexBuffer(offset);
}
}
4. 客户端源码分析
4.1 Producer 源码分析
Producer 是消息生产者,负责发送消息到 Broker。下面我们来分析 Producer 的核心源码。
org.apache.rocketmq.client.producer.DefaultMQProducer
类是 Producer 的默认实现:
public class DefaultMQProducer extends ClientConfig implements MQProducer {
// 生产者组
private String producerGroup;
// 默认主题队列数
private volatile int defaultTopicQueueNums = 4;
// 发送消息超时时间
private int sendMsgTimeout = 3000;
// 压缩消息阈值
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 重试次数
private int retryTimesWhenSendFailed = 2;
// 异步发送重试次数
private int retryTimesWhenSendAsyncFailed = 2;
// 是否重试另一个 Broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// 最大消息大小
private int maxMessageSize = 1024 * 1024 * 4; // 4M
// 生产者实现
private final InternalProducer defaultProducer;
// 构造函数
public DefaultMQProducer() {
this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
this.defaultProducer = new DefaultMQProducerImpl(this, rpcHook);
}
// 启动生产者
@Override
public void start() throws MQClientException {
this.defaultProducer.start();
}
// 关闭生产者
@Override
public void shutdown() {
this.defaultProducer.shutdown();
}
// 同步发送消息
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultProducer.send(msg);
}
// 异步发送消息
@Override
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
this.defaultProducer.send(msg, sendCallback);
}
// 单向发送消息
@Override
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
this.defaultProducer.sendOneway(msg);
}
}
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
类是 Producer 的核心实现:
public class DefaultMQProducerImpl implements MQProducerInner {
// 发送消息
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
// 发送消息,带超时时间
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
// 发送消息实现
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 检查生产者状态
this.makeSureStateOK();
// 检查消息
Validators.checkMessage(msg, this.defaultMQProducer);
// 获取主题发布信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// 选择消息队列
MessageQueue mq = null;
// 发送结果
SendResult sendResult = null;
// 异常
Exception exception = null;
// 重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
// 已重试次数
int times = 0;
// 选择的 Broker 地址
String[] brokersSent = new String[timesTotal];
// 重试发送
for (; times < timesTotal; times++) {
// 选择消息队列
mq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mq != null) {
try {
// 发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
// 发送成功
if (sendResult != null) {
return sendResult;
}
} catch (MQClientException e) {
exception = e;
continue;
} catch (RemotingException e) {
exception = e;
continue;
} catch (MQBrokerException e) {
exception = e;
// 根据错误码判断是否需要重试
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK() && e.getResponseCode() == ResponseCode.FLUSH_DISK_TIMEOUT) {
continue;
}
} catch (InterruptedException e) {
exception = e;
}
}
}
// 发送失败,抛出异常
if (exception != null) {
throw exception;
}
// 未知错误
throw new MQClientException("No route info of this topic, " + msg.getTopic(), null);
}
// 未找到主题发布信息
throw new MQClientException("No route info of this topic, " + msg.getTopic(), null);
}
// 发送消息核心实现
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode,
final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 获取 Broker 地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
// 更新主题路由信息
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
// 构建发送消息请求
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
// 发送消息
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
// 异步发送
this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
return null;
case ONEWAY:
case SYNC:
// 同步发送
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
return sendResult;
}
}
4.2 Consumer 源码分析
Consumer 是消息消费者,负责从 Broker 拉取消息并消费。下面我们来分析 Consumer 的核心源码。
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
类是 Push 模式的 Consumer 默认实现:
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
// 消费者组
private String consumerGroup;
// 消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
// 消费起始位置
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 消费线程数
private int consumeThreadMin = 20;
private int consumeThreadMax = 64;
// 批量消费数量
private int consumeMessageBatchMaxSize = 1;
// 消费者实现
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 构造函数
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
this.defaultMQPushConsumerImpl.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);
}
// 启动消费者
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
// 关闭消费者
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
}
// 订阅主题
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
// 注册消息监听器
@Override
public void registerMessageListener(MessageListener messageListener) {
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
}
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
类是 Push 模式的 Consumer 核心实现:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// 启动消费者
public void start() throws MQClientException {
// 检查配置
this.checkConfig();
// 复制订阅关系
this.copySubscription();
// 设置消费者组
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 初始化重平衡服务
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 初始化拉取API
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册消费者
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please.", null);
}
// 启动MQ客户端
mQClientFactory.start();
// 启动消费服务
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) messageListener);
this.consumeMessageService.start();
}
// 拉取消息
public void pullMessage(final PullRequest pullRequest) {
// 获取消费处理队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 检查处理队列是否被丢弃
if (processQueue.isDropped()) {
return;
}
// 设置拉取中状态
processQueue.setPullMsgEvent(createPullMsgEvent(pullRequest));
// 构建拉取消息回调
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
// 处理拉取结果
switch (pullResult.getPullStatus()) {
case FOUND:
// 找到消息
List<MessageExt> msgList = pullResult.getMsgFoundList();
// 处理消息
processQueue.putMessage(msgList);
// 提交消费请求
consumeMessageService.submitConsumeRequest(msgList, processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
break;
case NO_NEW_MSG:
// 没有新消息
break;
case NO_MATCHED_MSG:
// 没有匹配的消息
break;
case OFFSET_ILLEGAL:
// 偏移量非法
break;
default:
break;
}
// 继续拉取消息
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
}
@Override
public void onException(Throwable e) {
// 拉取异常,稍后重试
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
// 拉取消息
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
this.defaultMQPushConsumer.getPullInterval(),
this.defaultMQPushConsumer.getConsumerPullTimeoutMillis(),
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
// 拉取异常,稍后重试
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
}
性能优化
1. 生产者性能优化
1.1 批量发送
批量发送可以有效提高生产者的吞吐量,减少网络开销。下面是批量发送的示例代码:
public void batchSend() {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
// 创建消息列表
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Message msg = new Message(
"BatchTopic",
"TagA",
("Hello RocketMQ " + i).getBytes()
);
messages.add(msg);
}
// 批量发送消息
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
1.2 异步发送
对于对延迟不敏感的场景,可以使用异步发送提高吞吐量:
public void asyncSend() {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducerGroup");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
// 设置重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);
// 异步发送消息
for (int i = 0; i < 100; i++) {
final int index = i;
Message msg = new Message(
"AsyncTopic",
"TagA",
"OrderID188",
("Hello RocketMQ " + i).getBytes()
);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Message sent successfully, index=" + index + ", msgId=" + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.println("Message sent failed, index=" + index + ", error=" + e.getMessage());
}
});
}
// 等待所有异步发送任务完成
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
1.3 生产者参数优化
public void optimizeProducer() {
DefaultMQProducer producer = new DefaultMQProducer("OptimizedProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 设置发送消息超时时间,默认3000ms
producer.setSendMsgTimeout(5000);
// 设置重试次数,默认2次
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
// 设置压缩消息阈值,默认4K
producer.setCompressMsgBodyOverHowmuch(8 * 1024);
// 设置最大消息大小,默认4M
producer.setMaxMessageSize(8 * 1024 * 1024);
// 设置发送失败时是否重试另一个Broker,默认false
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
try {
producer.start();
// 发送消息
// ...
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
2. 消费者性能优化
2.1 并发消费
对于对消息顺序没有要求的场景,可以使用并发消费提高消费速度:
public void concurrentConsume() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConcurrentConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
try {
// 订阅主题
consumer.subscribe("ConcurrentTopic", "*");
// 设置消费线程数,默认20
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 设置批量消费数量,默认1
consumer.setConsumeMessageBatchMaxSize(32);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 批量处理消息
for (MessageExt msg : msgs) {
System.out.println("Receive message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
} catch (Exception e) {
e.printStackTrace();
}
}
2.2 消费者参数优化
public void optimizeConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OptimizedConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
try {
// 订阅主题
consumer.subscribe("OptimizedTopic", "*");
// 设置消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 设置批量消费数量
consumer.setConsumeMessageBatchMaxSize(32);
// 设置拉取消息间隔,默认0ms
consumer.setPullInterval(0);
// 设置拉取消息数量,默认32
consumer.setPullBatchSize(32);
// 设置拉取消息超时时间,默认10000ms
consumer.setConsumerPullTimeoutMillis(10000);
// 设置消费超时时间,默认15分钟
consumer.setConsumeTimeout(15);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 批量处理消息
for (MessageExt msg : msgs) {
System.out.println("Receive message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
} catch (Exception e) {
e.printStackTrace();
}
}
3. Broker 性能优化
3.1 刷盘策略优化
RocketMQ 支持同步刷盘和异步刷盘两种策略,可以根据业务需求进行选择:
- 同步刷盘:消息写入磁盘后才返回成功响应,可靠性高,性能较低
- 异步刷盘:消息写入内存后就返回成功响应,后台线程异步刷盘,性能高,可靠性较低
在 Broker 配置文件中设置刷盘策略:
# 同步刷盘
flushDiskType=SYNC_FLUSH
# 异步刷盘
flushDiskType=ASYNC_FLUSH
3.2 主从复制策略优化
RocketMQ 支持同步复制和异步复制两种策略,可以根据业务需求进行选择:
- 同步复制:Master 向 Slave 同步数据完成后才返回成功响应给客户端,可靠性高,性能较低
- 异步复制:Master 向 Slave 异步复制数据,不等待复制完成就返回成功响应给客户端,性能高,可靠性较低
在 Broker 配置文件中设置复制策略:
# 同步复制
brokerRole=SYNC_MASTER
# 异步复制
brokerRole=ASYNC_MASTER
3.3 线程池优化
RocketMQ 使用多个线程池处理不同类型的请求,可以根据业务需求调整线程池大小:
# 发送消息线程池大小,默认16
sendMessageThreadPoolNums=32
# 处理消息线程池大小,默认16
processReplyMessageThreadPoolNums=32
# 拉取消息线程池大小,默认16
pullMessageThreadPoolNums=32
# 查询消息线程池大小,默认8
queryMessageThreadPoolNums=16
# 客户端管理线程池大小,默认16
clientManageThreadPoolNums=32
# 管理线程池大小,默认16
adminBrokerThreadPoolNums=16
# 消费者管理线程池大小,默认16
consumerManageThreadPoolNums=32
3.4 存储优化
# 增加 CommitLog 刷盘间隔,减少 I/O 次数,默认500ms
flushIntervalCommitLog=500
# 增加 ConsumeQueue 刷盘间隔,减少 I/O 次数,默认1000ms
flushIntervalConsumeQueue=1000
# 增加 PageCache 锁定内存,默认true
lockInStrictMode=true
# 增加传输数据缓冲区大小,默认true
transferMsgByHeap=false
tranceTransactionMsgByHeap=false
4. JVM 优化
RocketMQ 是基于 Java 开发的,JVM 参数的优化对性能有很大影响。下面是一些常用的 JVM 优化参数:
# 设置堆内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 使用 G1 垃圾收集器
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30"
# 开启 GC 日志
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
# 禁止快速抛出异常
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
# 预分配内存
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
# 设置直接内存大小
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
5. 操作系统优化
操作系统参数的优化也对 RocketMQ 的性能有很大影响。下面是一些常用的操作系统优化参数:
# 修改系统限制
cat > /etc/security/limits.conf << EOF
* soft nofile 65536
* hard nofile 65536
* soft nproc 131072
* hard nproc 131072
EOF
# 修改内核参数
cat > /etc/sysctl.conf << EOF
net.core.somaxconn=32768
net.core.netdev_max_backlog=65536
net.core.rmem_max=16777216
net.core.wmem_max=16777216
net.ipv4.tcp_max_syn_backlog=8096
net.ipv4.tcp_wmem=4096 65536 16777216
net.ipv4.tcp_rmem=4096 65536 16777216
net.ipv4.tcp_slow_start_after_idle=0
vm.max_map_count=262144
vm.swappiness=10
EOF
# 应用配置
sysctl -p
性能测试
1. 性能测试工具
RocketMQ 提供了性能测试工具,可以用来测试生产者和消费者的性能。下面是一些常用的性能测试命令:
1.1 生产者性能测试
# 测试生产者性能
sh bin/mqbenchmark producer -t TestTopic -n localhost:9876 -s 128 -c 16 -r 16
参数说明:
-t
:主题名称-n
:NameServer 地址-s
:消息大小,单位字节-c
:生产者数量-r
:每个生产者的发送线程数
1.2 消费者性能测试
# 测试消费者性能
sh bin/mqbenchmark consumer -t TestTopic -n localhost:9876 -g TestGroup -c 16 -r 16
参数说明:
-t
:主题名称-n
:NameServer 地址-g
:消费者组名称-c
:消费者数量-r
:每个消费者的消费线程数
2. 性能测试指标
2.1 生产者性能指标
- TPS:每秒处理的消息数
- RT:消息发送的平均响应时间
- 吞吐量:每秒发送的消息大小
2.2 消费者性能指标
- TPS:每秒处理的消息数
- RT:消息消费的平均响应时间
- 吞吐量:每秒消费的消息大小
3. 性能测试案例
3.1 单机性能测试
测试环境:
- CPU:8核
- 内存:16GB
- 磁盘:SSD
- 网络:千兆网卡
测试结果:
- 生产者 TPS:10万/秒
- 消费者 TPS:10万/秒
- 生产者 RT:1ms
- 消费者 RT:5ms
3.2 集群性能测试
测试环境:
- 3个 NameServer 节点
- 3个 Master Broker 节点
- 3个 Slave Broker 节点
- 每个节点配置:16核 CPU,32GB 内存,SSD 磁盘,千兆网卡
测试结果:
- 生产者 TPS:50万/秒
- 消费者 TPS:50万/秒
- 生产者 RT:2ms
- 消费者 RT:10ms
性能调优案例
1. 生产者性能调优案例
1.1 问题描述
生产环境中,生产者发送消息的 TPS 只有 5000/秒,远低于预期的 10000/秒。
1.2 问题分析
通过分析生产者日志和监控数据,发现以下问题:
- 生产者发送消息时使用的是同步发送模式
- 生产者发送消息的超时时间设置过短,导致频繁超时重试
- 生产者没有使用批量发送,每次只发送一条消息
1.3 解决方案
- 对于对延迟不敏感的场景,使用异步发送模式
- 增加发送消息的超时时间,减少超时重试
- 使用批量发送,每次发送多条消息
// 优化前
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("Topic", "Tag", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
// 优化后
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(5000);
producer.start();
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Message msg = new Message("Topic", "Tag", ("Hello RocketMQ " + i).getBytes());
messages.add(msg);
}
producer.send(messages, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Send success");
}
@Override
public void onException(Throwable e) {
System.out.println("Send failed");
}
});
1.4 优化效果
优化后,生产者发送消息的 TPS 提升到 15000/秒,超过预期的 10000/秒。
2. 消费者性能调优案例
2.1 问题描述
生产环境中,消费者消费消息的 TPS 只有 3000/秒,导致消息堆积严重。
2.2 问题分析
通过分析消费者日志和监控数据,发现以下问题:
- 消费者的消费线程数设置过小,默认为 20
- 消费者的批量消费数量设置过小,默认为 1
- 消费者处理消息的逻辑过于复杂,包含多次数据库操作和远程调用
2.3 解决方案
- 增加消费者的消费线程数
- 增加消费者的批量消费数量
- 优化消费者处理消息的逻辑,减少数据库操作和远程调用
// 优化前
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("Topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 处理消息,包含多次数据库操作和远程调用
processMessage(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
// 优化后
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("Topic", "*");
// 增加消费线程数
consumer.setConsumeThreadMin(50);
consumer.setConsumeThreadMax(100);
// 增加批量消费数量
consumer.setConsumeMessageBatchMaxSize(32);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 批量处理消息,减少数据库操作和远程调用
batchProcessMessage(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
2.4 优化效果
优化后,消费者消费消息的 TPS 提升到 10000/秒,消息堆积问题得到解决。
3. Broker 性能调优案例
3.1 问题描述
生产环境中,Broker 的 CPU 使用率过高,导致消息处理延迟增加。
3.2 问题分析
通过分析 Broker 日志和监控数据,发现以下问题:
- Broker 使用同步刷盘和同步复制模式,导致 CPU 使用率过高
- Broker 的线程池大小设置过小,导致请求处理延迟增加
- Broker 的 JVM 参数设置不合理,导致频繁 GC
3.3 解决方案
- 根据业务需求,调整刷盘和复制模式
- 增加 Broker 的线程池大小
- 优化 Broker 的 JVM 参数
# 优化前
flushDiskType=SYNC_FLUSH
brokerRole=SYNC_MASTER
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
# 优化后
flushDiskType=ASYNC_FLUSH
brokerRole=ASYNC_MASTER
sendMessageThreadPoolNums=32
pullMessageThreadPoolNums=32
# 优化前
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:+UseParallelGC"
# 优化后
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30"
3.4 优化效果
优化后,Broker 的 CPU 使用率降低到 50%,消息处理延迟减少到 1ms。
最佳实践
1. 生产者最佳实践
- 批量发送:尽量使用批量发送,提高吞吐量
- 异步发送:对于对延迟不敏感的场景,使用异步发送
- 合理设置超时时间:根据网络环境和业务需求,设置合理的超时时间
- 合理设置重试次数:根据业务需求,设置合理的重试次数
- 使用消息压缩:对于大消息,使用消息压缩减少网络传输量
2. 消费者最佳实践
- 并发消费:对于对消息顺序没有要求的场景,使用并发消费
- 批量消费:尽量使用批量消费,提高吞吐量
- 合理设置消费线程数:根据业务需求和机器配置,设置合理的消费线程数
- 消费者幂等性:确保消费者处理消息的幂等性,避免重复消费问题
- 消费者负载均衡:合理设置消费者数量,避免单个消费者负载过高
3. Broker 最佳实践
- 合理设置刷盘和复制模式:根据业务需求,设置合理的刷盘和复制模式
- 合理设置线程池大小:根据业务需求和机器配置,设置合理的线程池大小
- 合理设置 JVM 参数:根据机器配置,设置合理的 JVM 参数
- 使用 SSD 存储:使用 SSD 存储提高 I/O 性能
- 定期清理过期消息:定期清理过期消息,避免磁盘空间不足
总结
本文详细介绍了 RocketMQ 的源码分析和性能优化:
- ✅ 源码结构概览:核心模块、源码目录结构
- ✅ 核心组件源码分析:NameServer、Broker、存储模块、客户端
- ✅ 性能优化:生产者性能优化、消费者性能优化、Broker 性能优化、JVM 优化、操作系统优化
- ✅ 性能测试:性能测试工具、性能测试指标、性能测试案例
- ✅ 性能调优案例:生产者性能调优案例、消费者性能调优案例、Broker 性能调优案例
- ✅ 最佳实践:生产者最佳实践、消费者最佳实践、Broker 最佳实践
下一步学习
- 学习 RocketMQ 的高级特性,如事务消息、延时消息等
- 了解 RocketMQ 的源码实现原理
- 探索 RocketMQ 在大规模分布式系统中的应用
希望这篇文章能帮助您更好地理解 RocketMQ 的源码和性能优化!如果您有任何问题,欢迎在评论区讨论。