RabbitMQ基础入门
2025/8/15大约 4 分钟
RabbitMQ基础入门
前置知识
在开始本教程之前,建议您具备以下基础知识:
- Java基础语法
- Maven或Gradle构建工具
- Spring框架基础
- 消息队列基本概念
什么是RabbitMQ?
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。RabbitMQ是使用Erlang语言来编写的,并且基于AMQP(Advanced Message Queuing Protocol)协议。
主要特点
- 可靠性:提供持久化、传输确认等机制
- 灵活路由:消息在到达队列前,可以经过多个交换机
- 集群:多个RabbitMQ服务器可以组成一个集群
- 高可用:队列可以在集群中复制,保证高可用性
- 多协议:支持AMQP、MQTT、STOMP等多种协议
核心概念
1. 基本组件
RabbitMQ的核心组件包括:
- Producer(生产者):发送消息的应用程序
- Consumer(消费者):接收消息的应用程序
- Exchange(交换机):接收生产者发送的消息,根据规则转发给队列
- Queue(队列):存储消息的缓冲区
- Binding(绑定):交换机和队列之间的连接关系
2. 交换机类型
- Direct Exchange:直接匹配路由键
- Topic Exchange:模式匹配路由键
- Fanout Exchange:广播消息给所有绑定的队列
- Headers Exchange:根据消息头属性匹配
环境准备
1. 添加依赖
<dependencies>
<!-- Spring AMQP依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2. 配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
快速开始
1. 配置类
@Configuration
public class RabbitConfig {
@Bean
public Queue simpleQueue() {
return new Queue("simple.queue", true);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("simple.exchange");
}
@Bean
public Binding binding(Queue simpleQueue, DirectExchange directExchange) {
return BindingBuilder.bind(simpleQueue)
.to(directExchange)
.with("simple.key");
}
}
2. 生产者
@Service
@Slf4j
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("simple.exchange", "simple.key", message);
log.info("消息发送成功:{}", message);
}
}
3. 消费者
@Component
@Slf4j
public class MessageConsumer {
@RabbitListener(queues = "simple.queue")
public void receiveMessage(String message) {
log.info("收到消息:{}", message);
}
}
交换机示例
1. Direct Exchange
@Configuration
public class DirectConfig {
@Bean
public Queue directQueue() {
return new Queue("direct.queue");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue())
.to(directExchange())
.with("direct.key");
}
}
2. Topic Exchange
Topic Exchange配置示例
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue.1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue.2");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange");
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with("topic.#"); // 匹配topic.开头的所有路由键
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2())
.to(topicExchange())
.with("topic.*"); // 匹配topic.开头的一个单词
}
}
## 消息确认机制
### 1. 生产者确认
```yaml
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
@Service
@Slf4j
public class ConfirmMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息发送成功");
} else {
log.error("消息发送失败:{}", cause);
}
});
}
}
2. 消费者确认
@Component
@Slf4j
public class AckMessageConsumer {
@RabbitListener(queues = "confirm.queue")
public void receiveMessage(Message message, Channel channel) throws IOException {
try {
// 处理消息
log.info("处理消息:{}", new String(message.getBody()));
// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
最佳实践
1. 消息持久化
建议
- 队列持久化:声明队列时设置durable=true
- 消息持久化:设置消息的deliveryMode=2
- 交换机持久化:声明交换机时设置durable=true
2. 消息可靠性
注意事项
- 启用生产者确认机制
- 使用消费者手动确认
- 考虑使用死信队列处理失败消息
- 实现消息幂等性处理
3. 性能优化
优化建议
- 合理设置预取数量(prefetch count)
- 使用消息批量处理
- 避免消息堆积
- 合理设置连接池大小
总结
本文详细介绍了RabbitMQ的:
- ✅ 核心概念和组件
- ✅ 基本配置和使用
- ✅ 交换机类型示例
- ✅ 消息确认机制
- ✅ 最佳实践建议
下一步学习
- 深入了解RabbitMQ的高级特性
- 学习死信队列和延迟队列
- 掌握集群环境配置
希望这篇文章能帮助您快速入门RabbitMQ!如果您有任何问题,欢迎在评论区讨论。