RabbitMQ 消息队列详解及 Spring Boot 实战教程

RabbitMQ 消息队列详解及 Spring Boot 实战教程

尽意
2024-11-26 / 0 评论 / 29 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2024年11月26日,已超过57天没有更新,若内容或图片失效,请留言反馈。

4e3e990d0415da1b.jpg

RabbitMQ 是一个高效、可靠的消息代理中间件,广泛应用于分布式系统中,用于实现微服务之间的异步通信和解耦。

一、RabbitMQ 基本概念

1.1 核心概念

  1. 队列(Queue):

    • 存储消息的容器,消息由生产者发送到队列,消费者从队列中取出进行处理。支持点对点(P2P)模式。
  2. 交换机(Exchange):

    • 负责根据路由规则将消息分发到队列。

      • 常见交换机类型:Direct、Fanout、Topic。
  3. 绑定(Binding):

    • 连接交换机和队列的规则,结合路由键(Routing Key)定义消息的路由方式。
  4. 消息模型:

    • 点对点模式(P2P): 一条消息只能被一个消费者消费。
    • 发布订阅模式(Pub/Sub): 一条消息可以被多个消费者消费。

二、三种交换机详解

2.1 Direct 交换机

  • 路由方式: 消息根据路由键(Routing Key)精准匹配到指定队列。
  • 优点: 简单、高效,适用于明确路由需求的场景。
  • 缺点: 需要为每个队列绑定具体的路由键,维护成本较高。
  • 应用场景: 点对点通知,如订单支付成功通知。

2.2 Fanout 交换机

  • 路由方式: 广播消息到所有绑定队列,无需路由键。
  • 优点: 适合广播场景,无需复杂路由配置。
  • 缺点: 不支持消息过滤,容易造成资源浪费。
  • 应用场景: 全网公告或系统事件通知。

2.3 Topic 交换机

  • 路由方式: 根据通配符模糊匹配路由键。

    • * 匹配一个单词,# 匹配零个或多个单词。
  • 优点: 灵活的路由规则,支持复杂消息分发需求。
  • 缺点: 配置复杂,性能略低于其他类型交换机。
  • 应用场景: 新闻分类推送或订阅系统。

三、Spring Boot 与 RabbitMQ 整合

3.1 项目依赖

pom.xml 文件中添加 RabbitMQ 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 RabbitMQ 配置两种方式

方法一:使用 application.yml 配置

application.yml 文件中配置 RabbitMQ:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

方法二:通过配置类创建连接工厂

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQFactoryConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        return factory;
    }
}

3.3 交换机、队列及绑定的两种创建方式

方法一:直接使用 new 创建

import org.springframework.amqp.core.*;

@Configuration
public class RabbitMQDirectConfig {

    @Bean
    public Queue directQueue() {
        return new Queue("direct.queue");
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct.exchange");
    }

    @Bean
    public Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("direct.key");
    }
}

方法二:通过工厂类创建

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQFactoryConfig {

    @Bean
    public Queue fanoutQueue() {
        return QueueBuilder.durable("fanout.queue").build();
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return ExchangeBuilder.fanoutExchange("fanout.exchange").durable(true).build();
    }

    @Bean
    public Binding bindingFanout(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}

3.4 消息生产者与消费者

消息生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    private final RabbitTemplate rabbitTemplate;

    public MessageProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendDirectMessage(String message) {
        rabbitTemplate.convertAndSend("direct.exchange", "direct.key", message);
    }

    public void sendFanoutMessage(String message) {
        rabbitTemplate.convertAndSend("fanout.exchange", "", message);
    }

    public void sendTopicMessage(String message) {
        rabbitTemplate.convertAndSend("topic.exchange", "topic.test", message);
    }
}

消息消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {

    @RabbitListener(queues = "direct.queue")
    public void receiveDirectMessage(String message) {
        System.out.println("Received Direct Message: " + message);
    }

    @RabbitListener(queues = "fanout.queue")
    public void receiveFanoutMessage(String message) {
        System.out.println("Received Fanout Message: " + message);
    }
}

3.5 单元测试

使用 JUnit 测试消息发送功能:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

    @Autowired
    private MessageProducer messageProducer;

    @Test
    public void testDirectMessage() {
        messageProducer.sendDirectMessage("Test Direct Exchange");
    }

    @Test
    public void testFanoutMessage() {
        messageProducer.sendFanoutMessage("Test Fanout Exchange");
    }
}

四、总结

本文通过介绍 RabbitMQ 的核心概念,分析了三种交换机的优缺点及适用场景,并通过 Spring Boot 实现了 MQ 的完整配置与消息生产、消费功能。提供的两种实现方式(application.yml 配置和配置类方式,以及直接 new 和工厂创建方式)为不同项目需求提供了灵活选择。

3

评论 (0)

取消