RocketMQ与Spring Boot集成
RocketMQ与Spring Boot集成指南
概述
Spring Boot 为 RocketMQ 提供了良好的支持,通过 rocketmq-spring-boot-starter
可以快速集成 RocketMQ,简化开发流程。本文将详细介绍如何在 Spring Boot 项目中集成 RocketMQ,实现消息的生产和消费。
环境准备
1. 依赖引入
在 pom.xml
中添加 RocketMQ Spring Boot Starter 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
2. 配置文件
在 application.properties
或 application.yml
中添加 RocketMQ 相关配置:
# application.yml
server:
port: 8080
rocketmq:
name-server: localhost:9876 # RocketMQ Name Server 地址
producer:
group: my-producer-group # 生产者组名
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数
max-message-size: 4194304 # 消息最大长度,默认4M
compress-message-body-threshold: 4096 # 消息压缩阈值,默认4K
retry-next-server: true # 是否在内部发送失败时重试另一个broker
消息实体类
创建一个简单的消息实体类,用于传输数据:
package com.example.rocketmq.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
private String content;
private Long timestamp;
}
生产者实现
1. 基本生产者
创建一个基本的消息生产者服务:
package com.example.rocketmq.producer;
import com.example.rocketmq.model.MessageDTO;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class MessageProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final String TOPIC = "spring-boot-topic";
/**
* 同步发送消息
*/
public SendResult syncSend(MessageDTO message) {
return rocketMQTemplate.syncSend(TOPIC, message);
}
/**
* 异步发送消息
*/
public void asyncSend(MessageDTO message, SendCallback callback) {
rocketMQTemplate.asyncSend(TOPIC, message, callback);
}
/**
* 单向发送消息
*/
public void sendOneWay(MessageDTO message) {
rocketMQTemplate.sendOneWay(TOPIC, message);
}
}
2. 高级生产者功能
创建一个包含更多高级功能的生产者服务:
package com.example.rocketmq.producer;
import com.example.rocketmq.model.MessageDTO;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class AdvancedProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private static final String TOPIC = "spring-boot-topic";
/**
* 发送带标签的消息
*/
public SendResult sendWithTag(MessageDTO message, String tag) {
String destination = TOPIC + ":" + tag;
return rocketMQTemplate.syncSend(destination, message);
}
/**
* 发送延时消息
* @param delayLevel 延时级别,从1开始:1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
*/
public SendResult sendDelayMessage(MessageDTO message, int delayLevel) {
Message<MessageDTO> msg = MessageBuilder.withPayload(message).build();
return rocketMQTemplate.syncSend(TOPIC, msg, 3000, delayLevel);
}
/**
* 发送带属性的消息
*/
public SendResult sendWithProperties(MessageDTO message, String key, String value) {
Message<MessageDTO> msg = MessageBuilder.withPayload(message)
.setHeader(key, value)
.setHeader(RocketMQHeaders.KEYS, message.getId())
.build();
return rocketMQTemplate.syncSend(TOPIC, msg);
}
/**
* 批量发送消息
*/
public SendResult sendBatchMessages(List<MessageDTO> messages) {
List<Message<MessageDTO>> messageList = new ArrayList<>();
for (MessageDTO message : messages) {
Message<MessageDTO> msg = MessageBuilder.withPayload(message).build();
messageList.add(msg);
}
return rocketMQTemplate.syncSend(TOPIC, messageList);
}
/**
* 发送事务消息
*/
public TransactionSendResult sendMessageInTransaction(MessageDTO message, Object arg) {
Message<MessageDTO> msg = MessageBuilder.withPayload(message)
.setHeader(RocketMQHeaders.TRANSACTION_ID, message.getId())
.build();
return rocketMQTemplate.sendMessageInTransaction(TOPIC, msg, arg);
}
}
消费者实现
1. 基本消费者
创建一个基本的消息消费者:
package com.example.rocketmq.consumer;
import com.example.rocketmq.model.MessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "my-consumer-group"
)
public class SimpleMessageConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
log.info("收到消息: {}", message);
// 处理消息的业务逻辑
}
}
2. 带标签的消费者
创建一个只消费特定标签消息的消费者:
package com.example.rocketmq.consumer;
import com.example.rocketmq.model.MessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "tag-consumer-group",
selectorType = SelectorType.TAG,
selectorExpression = "TagA || TagB"
)
public class TagMessageConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
log.info("收到TagA或TagB消息: {}", message);
// 处理消息的业务逻辑
}
}
3. SQL过滤的消费者
创建一个使用SQL表达式过滤消息的消费者:
package com.example.rocketmq.consumer;
import com.example.rocketmq.model.MessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "sql-consumer-group",
selectorType = SelectorType.SQL92,
selectorExpression = "region='北京' AND age>=18"
)
public class SqlFilterConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
log.info("收到SQL过滤消息: {}", message);
// 处理消息的业务逻辑
}
}
4. 顺序消息消费者
创建一个顺序消息消费者:
package com.example.rocketmq.consumer;
import com.example.rocketmq.model.MessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "ordered-consumer-group",
consumeMode = ConsumeMode.ORDERLY
)
public class OrderedMessageConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
log.info("顺序消费消息: {}", message);
// 处理消息的业务逻辑
}
}
5. 广播消息消费者
创建一个广播模式的消息消费者:
package com.example.rocketmq.consumer;
import com.example.rocketmq.model.MessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "broadcast-consumer-group",
messageModel = MessageModel.BROADCASTING
)
public class BroadcastMessageConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
log.info("广播模式消费消息: {}", message);
// 处理消息的业务逻辑
}
}
事务消息实现
1. 事务监听器
创建一个事务消息监听器:
package com.example.rocketmq.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<String, RocketMQLocalTransactionState> localTrans = new ConcurrentHashMap<>();
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String) msg.getHeaders().get("rocketmq_TRANSACTION_ID");
log.info("执行本地事务,事务ID: {}", transId);
try {
// 模拟执行本地事务
// 实际业务中,这里可能是数据库操作等
Thread.sleep(500);
// 模拟不同的事务结果
int status = (int) (Math.random() * 3);
switch (status) {
case 0:
// 本地事务成功,提交事务消息
localTrans.put(transId, RocketMQLocalTransactionState.COMMIT);
return RocketMQLocalTransactionState.COMMIT;
case 1:
// 本地事务未知,等待回查
localTrans.put(transId, RocketMQLocalTransactionState.UNKNOWN);
return RocketMQLocalTransactionState.UNKNOWN;
case 2:
// 本地事务失败,回滚事务消息
localTrans.put(transId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.ROLLBACK;
default:
return RocketMQLocalTransactionState.UNKNOWN;
}
} catch (Exception e) {
log.error("执行本地事务异常", e);
localTrans.put(transId, RocketMQLocalTransactionState.ROLLBACK);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 检查本地事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String) msg.getHeaders().get("rocketmq_TRANSACTION_ID");
log.info("回查本地事务,事务ID: {}", transId);
RocketMQLocalTransactionState state = localTrans.get(transId);
if (state != null) {
return state;
}
// 如果找不到事务状态,默认提交
// 实际业务中,应该根据业务数据判断事务状态
return RocketMQLocalTransactionState.COMMIT;
}
}
控制器实现
创建一个控制器,提供API接口来发送消息:
MessageController.java
package com.example.rocketmq.controller;
import com.example.rocketmq.model.MessageDTO;
import com.example.rocketmq.producer.AdvancedProducerService;
import com.example.rocketmq.producer.MessageProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Slf4j
@RestController
@RequestMapping("/message")
public class MessageController {
@Autowired
private MessageProducerService producerService;
@Autowired
private AdvancedProducerService advancedProducerService;
/**
* 同步发送消息
*/
@PostMapping("/sync")
public SendResult syncSend(@RequestBody String content) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
return producerService.syncSend(message);
}
/**
* 异步发送消息
*/
@PostMapping("/async")
public String asyncSend(@RequestBody String content) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
producerService.asyncSend(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送成功: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error("异步发送异常", e);
}
});
return "消息已异步发送";
}
/**
* 单向发送消息
*/
@PostMapping("/oneway")
public String sendOneWay(@RequestBody String content) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
producerService.sendOneWay(message);
return "消息已单向发送";
}
/**
* 发送带标签的消息
*/
@PostMapping("/tag/{tag}")
public SendResult sendWithTag(@RequestBody String content, @PathVariable String tag) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
return advancedProducerService.sendWithTag(message, tag);
}
/**
* 发送延时消息
*/
@PostMapping("/delay/{level}")
public SendResult sendDelayMessage(@RequestBody String content, @PathVariable int level) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
return advancedProducerService.sendDelayMessage(message, level);
}
/**
* 发送带属性的消息
*/
@PostMapping("/property")
public SendResult sendWithProperties(
@RequestBody String content,
@RequestParam String key,
@RequestParam String value) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
return advancedProducerService.sendWithProperties(message, key, value);
}
/**
* 批量发送消息
*/
@PostMapping("/batch/{count}")
public SendResult sendBatchMessages(@RequestBody String content, @PathVariable int count) {
List<MessageDTO> messages = new ArrayList<>();
for (int i = 0; i < count; i++) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content + " - " + i,
System.currentTimeMillis()
);
messages.add(message);
}
return advancedProducerService.sendBatchMessages(messages);
}
/**
* 发送事务消息
*/
@PostMapping("/transaction")
public TransactionSendResult sendTransactionMessage(@RequestBody String content) {
MessageDTO message = new MessageDTO(
UUID.randomUUID().toString(),
content,
System.currentTimeMillis()
);
return advancedProducerService.sendMessageInTransaction(message, null);
}
}
应用启动类
创建 Spring Boot 应用启动类:
package com.example.rocketmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMQSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQSpringBootApplication.class, args);
}
}
实际应用场景
1. 异步处理
在需要异步处理的场景中,可以使用 RocketMQ 解耦业务流程,提高系统响应速度。
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderRepository orderRepository;
/**
* 创建订单
*/
@Transactional
public Order createOrder(OrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setOrderId(UUID.randomUUID().toString());
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setCreateTime(new Date());
// 2. 保存订单
orderRepository.save(order);
// 3. 异步发送消息,通知其他系统
rocketMQTemplate.asyncSend("order-topic", order, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单创建消息发送成功: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error("订单创建消息发送失败", e);
}
});
return order;
}
}
2. 分布式事务
在涉及多个系统的分布式事务场景中,可以使用 RocketMQ 的事务消息特性保证数据一致性。
@Service
public class PaymentService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private PaymentRepository paymentRepository;
/**
* 处理支付
*/
public void processPayment(PaymentRequest request) {
// 1. 构建支付消息
Payment payment = new Payment();
payment.setPaymentId(UUID.randomUUID().toString());
payment.setOrderId(request.getOrderId());
payment.setAmount(request.getAmount());
payment.setStatus(PaymentStatus.PROCESSING);
// 2. 发送事务消息
Message<Payment> message = MessageBuilder.withPayload(payment)
.setHeader(RocketMQHeaders.TRANSACTION_ID, payment.getPaymentId())
.build();
rocketMQTemplate.sendMessageInTransaction("payment-topic", message, payment);
}
}
@Component
@RocketMQTransactionListener
public class PaymentTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private PaymentRepository paymentRepository;
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Payment payment = (Payment) arg;
try {
// 执行本地事务:保存支付记录
paymentRepository.save(payment);
// 调用第三方支付接口
boolean paymentSuccess = callThirdPartyPayment(payment);
if (paymentSuccess) {
// 支付成功,更新支付状态
payment.setStatus(PaymentStatus.SUCCESS);
paymentRepository.save(payment);
return RocketMQLocalTransactionState.COMMIT;
} else {
// 支付失败,更新支付状态
payment.setStatus(PaymentStatus.FAILED);
paymentRepository.save(payment);
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
log.error("支付处理异常", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String paymentId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
// 查询支付状态
Payment payment = paymentRepository.findByPaymentId(paymentId);
if (payment != null) {
if (PaymentStatus.SUCCESS.equals(payment.getStatus())) {
return RocketMQLocalTransactionState.COMMIT;
} else if (PaymentStatus.FAILED.equals(payment.getStatus())) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.UNKNOWN;
}
private boolean callThirdPartyPayment(Payment payment) {
// 调用第三方支付接口的逻辑
// 这里简化处理,随机返回成功或失败
return Math.random() > 0.3;
}
}
3. 削峰填谷
在流量高峰期,可以使用 RocketMQ 削峰填谷,避免系统过载。
@RestController
@RequestMapping("/api/v1/orders")
public class OrderController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 下单接口
*/
@PostMapping
public ResponseEntity<String> placeOrder(@RequestBody OrderRequest request) {
// 将订单请求发送到消息队列,异步处理
rocketMQTemplate.syncSend("order-request-topic", request);
return ResponseEntity.ok("订单已提交,正在处理中");
}
}
@Component
@RocketMQMessageListener(
topic = "order-request-topic",
consumerGroup = "order-processor-group"
)
public class OrderRequestConsumer implements RocketMQListener<OrderRequest> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(OrderRequest request) {
try {
// 处理订单请求
orderService.createOrder(request);
} catch (Exception e) {
log.error("处理订单请求异常", e);
// 异常处理逻辑
}
}
}
最佳实践
1. 消息幂等性处理
在消费者端实现幂等性处理,避免重复消费导致的问题:
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "idempotent-consumer-group"
)
public class IdempotentConsumer implements RocketMQListener<MessageDTO> {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String CONSUMED_MESSAGE_KEY_PREFIX = "consumed:message:";
@Override
public void onMessage(MessageDTO message) {
// 使用消息ID作为幂等性检查的key
String consumedKey = CONSUMED_MESSAGE_KEY_PREFIX + message.getId();
// 使用Redis的setIfAbsent方法实现幂等性检查
Boolean isFirstConsumed = redisTemplate.opsForValue().setIfAbsent(consumedKey, "1", 7, TimeUnit.DAYS);
if (Boolean.TRUE.equals(isFirstConsumed)) {
// 首次消费,执行业务逻辑
log.info("处理消息: {}", message);
processMessage(message);
} else {
// 重复消费,忽略
log.info("消息已处理,忽略: {}", message);
}
}
private void processMessage(MessageDTO message) {
// 处理消息的业务逻辑
}
}
2. 消息重试处理
对于消费失败的消息,可以通过自定义重试策略进行处理:
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "retry-consumer-group",
maxReconsumeTimes = 3
)
public class RetryConsumer implements RocketMQListener<MessageDTO> {
@Override
public void onMessage(MessageDTO message) {
try {
// 处理消息的业务逻辑
processMessage(message);
} catch (Exception e) {
// 获取消息的重试次数
MessageAccessor.getReconsumeTime();
// 根据异常类型和重试次数决定是否重试
if (e instanceof BusinessException) {
// 业务异常,不再重试
log.error("业务异常,不再重试: {}", e.getMessage());
return;
}
// 其他异常,抛出异常触发重试
log.error("处理消息异常,将进行重试", e);
throw new RuntimeException("处理消息异常,触发重试", e);
}
}
private void processMessage(MessageDTO message) {
// 处理消息的业务逻辑
}
}
3. 消息轨迹追踪
开启消息轨迹功能,方便问题排查:
# application.yml
rocketmq:
name-server: localhost:9876
producer:
group: my-producer-group
enable-msg-trace: true # 开启消息轨迹
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义轨迹Topic
常见问题
1. 消息发送失败如何处理?
消息发送失败的处理策略:
- 重试机制:设置合理的重试次数和重试间隔
- 本地消息表:将消息先保存到本地数据库,定时任务扫描发送
- 消息补偿:通过定时任务或手动触发重新发送失败的消息
@Service
public class ReliableMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private MessageRepository messageRepository;
/**
* 可靠发送消息
*/
@Transactional
public void sendMessageReliably(MessageDTO message, String topic) {
// 1. 保存消息到本地表
LocalMessage localMessage = new LocalMessage();
localMessage.setMessageId(message.getId());
localMessage.setTopic(topic);
localMessage.setContent(JSON.toJSONString(message));
localMessage.setStatus(MessageStatus.PENDING);
localMessage.setCreateTime(new Date());
messageRepository.save(localMessage);
try {
// 2. 发送消息到RocketMQ
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
// 3. 发送成功,更新本地消息状态
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
localMessage.setStatus(MessageStatus.SENT);
localMessage.setUpdateTime(new Date());
messageRepository.save(localMessage);
}
} catch (Exception e) {
// 发送失败,由定时任务补偿
log.error("消息发送失败,将由定时任务补偿", e);
}
}
}
@Component
public class MessageCompensationTask {
@Autowired
private MessageRepository messageRepository;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 定时补偿发送失败的消息
*/
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void compensateSendMessage() {
// 查询发送失败的消息
List<LocalMessage> failedMessages = messageRepository.findByStatusAndCreateTimeBefore(
MessageStatus.PENDING,
new Date(System.currentTimeMillis() - 5 * 60 * 1000) // 5分钟前的消息
);
for (LocalMessage message : failedMessages) {
try {
// 重新发送消息
MessageDTO messageDTO = JSON.parseObject(message.getContent(), MessageDTO.class);
SendResult sendResult = rocketMQTemplate.syncSend(message.getTopic(), messageDTO);
// 发送成功,更新状态
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
message.setStatus(MessageStatus.SENT);
message.setUpdateTime(new Date());
messageRepository.save(message);
}
} catch (Exception e) {
log.error("补偿发送消息失败: {}", message.getMessageId(), e);
// 更新重试次数
message.setRetryCount(message.getRetryCount() + 1);
message.setUpdateTime(new Date());
messageRepository.save(message);
}
}
}
}
2. 如何处理消费者启动时堆积的大量消息?
处理消费者启动时堆积的大量消息的策略:
- 增加消费者实例:临时增加消费者实例,提高并行处理能力
- 批量消费:增加批量消费的消息数量,减少网络开销
- 消费者线程池:增加消费者线程数,提高单个消费者的处理能力
- 消息过滤:根据业务需求,过滤掉不需要处理的历史消息
// 配置消费者参数
@Component
@RocketMQMessageListener(
topic = "spring-boot-topic",
consumerGroup = "bulk-consumer-group",
consumeThreadMax = 20, // 最大消费线程数
consumeThreadMin = 10, // 最小消费线程数
consumeMessageBatchMaxSize = 32 // 批量消费最大消息数
)
public class BulkMessageConsumer implements RocketMQListener<List<MessageExt>> {
@Override
public void onMessage(List<MessageExt> messages) {
// 批量处理消息
log.info("批量处理 {} 条消息", messages.size());
// 处理消息的业务逻辑
}
}
总结
本文详细介绍了 RocketMQ 与 Spring Boot 的集成方法:
- ✅ 环境准备:引入依赖和配置文件
- ✅ 生产者实现:基本生产者和高级生产者功能
- ✅ 消费者实现:基本消费者、标签过滤、SQL过滤、顺序消费和广播消费
- ✅ 事务消息实现:事务监听器的实现
- ✅ 控制器实现:提供API接口发送消息
- ✅ 实际应用场景:异步处理、分布式事务和削峰填谷
- ✅ 最佳实践:消息幂等性处理、消息重试处理和消息轨迹追踪
下一步学习
- 学习 RocketMQ 的集群部署和运维
- 了解 RocketMQ 的监控和告警
- 探索 RocketMQ 在微服务架构中的应用
希望这篇文章能帮助您在 Spring Boot 项目中快速集成 RocketMQ!如果您有任何问题,欢迎在评论区讨论。