RabbitMQ 是一个高效、可靠的消息代理中间件,广泛应用于分布式系统中,用于实现微服务之间的异步通信和解耦。
一、RabbitMQ 基本概念
1.1 核心概念
队列(Queue):
- 存储消息的容器,消息由生产者发送到队列,消费者从队列中取出进行处理。支持点对点(P2P)模式。
交换机(Exchange):
负责根据路由规则将消息分发到队列。
- 常见交换机类型:Direct、Fanout、Topic。
绑定(Binding):
- 连接交换机和队列的规则,结合路由键(Routing Key)定义消息的路由方式。
消息模型:
- 点对点模式(P2P): 一条消息只能被一个消费者消费。
- 发布订阅模式(Pub/Sub): 一条消息可以被多个消费者消费。
二、三种交换机详解
2.1 Direct 交换机
- 路由方式: 消息根据路由键(Routing Key)精准匹配到指定队列。
- 优点: 简单、高效,适用于明确路由需求的场景。
- 缺点: 需要为每个队列绑定具体的路由键,维护成本较高。
- 应用场景: 点对点通知,如订单支付成功通知。
2.2 Fanout 交换机
- 路由方式: 广播消息到所有绑定队列,无需路由键。
- 优点: 适合广播场景,无需复杂路由配置。
- 缺点: 不支持消息过滤,容易造成资源浪费。
- 应用场景: 全网公告或系统事件通知。
2.3 Topic 交换机
路由方式: 根据通配符模糊匹配路由键。
*
匹配一个单词,#
匹配零个或多个单词。
- 优点: 灵活的路由规则,支持复杂消息分发需求。
- 缺点: 配置复杂,性能略低于其他类型交换机。
- 应用场景: 新闻分类推送或订阅系统。
三、Spring Boot 与 RabbitMQ 整合
3.1 项目依赖
在 pom.xml
文件中添加 RabbitMQ 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2 RabbitMQ 配置两种方式
方法一:使用 application.yml
配置
在 application.yml
文件中配置 RabbitMQ:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
方法二:通过配置类创建连接工厂
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQFactoryConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
}
3.3 交换机、队列及绑定的两种创建方式
方法一:直接使用 new
创建
import org.springframework.amqp.core.*;
@Configuration
public class RabbitMQDirectConfig {
@Bean
public Queue directQueue() {
return new Queue("direct.queue");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
@Bean
public Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("direct.key");
}
}
方法二:通过工厂类创建
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQFactoryConfig {
@Bean
public Queue fanoutQueue() {
return QueueBuilder.durable("fanout.queue").build();
}
@Bean
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("fanout.exchange").durable(true).build();
}
@Bean
public Binding bindingFanout(Queue fanoutQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
}
3.4 消息生产者与消费者
消息生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
public MessageProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendDirectMessage(String message) {
rabbitTemplate.convertAndSend("direct.exchange", "direct.key", message);
}
public void sendFanoutMessage(String message) {
rabbitTemplate.convertAndSend("fanout.exchange", "", message);
}
public void sendTopicMessage(String message) {
rabbitTemplate.convertAndSend("topic.exchange", "topic.test", message);
}
}
消息消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@RabbitListener(queues = "direct.queue")
public void receiveDirectMessage(String message) {
System.out.println("Received Direct Message: " + message);
}
@RabbitListener(queues = "fanout.queue")
public void receiveFanoutMessage(String message) {
System.out.println("Received Fanout Message: " + message);
}
}
3.5 单元测试
使用 JUnit 测试消息发送功能:
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void testDirectMessage() {
messageProducer.sendDirectMessage("Test Direct Exchange");
}
@Test
public void testFanoutMessage() {
messageProducer.sendFanoutMessage("Test Fanout Exchange");
}
}
四、总结
本文通过介绍 RabbitMQ 的核心概念,分析了三种交换机的优缺点及适用场景,并通过 Spring Boot 实现了 MQ 的完整配置与消息生产、消费功能。提供的两种实现方式(application.yml
配置和配置类方式,以及直接 new
和工厂创建方式)为不同项目需求提供了灵活选择。
评论 (0)