小言_互联网的博客

SpringBoot整合RabbitMQ

568人阅读  评论(0)


源码


一、RabbitMQ的安装

1.安装

  • 连接上linux服务器

  • 输入 docker images查看是否安装

    可以看到是没有安装rabbitMQ的

  • 输入docker pull rabbitmq:3.6-management
    management是带web管理页面的

  • 输入docker images检查是否有镜像

    可以看到已经安装完成

  • 运行RabbitMQ,docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 9df8e9792ce6

第一个-p是客户端与RabbitMQ通信的端口
第二个-p是管理界面访问web界面的端口

  • 输入docker ps

    可以看到RabbitMQ已经在后台运行

2.检测

  • 输入systemctl stop firewalld.service关闭防火墙

  • 如果是阿里云的服务器,需要开放端口

  • 在浏览器中输入服务器的ip地址:15672
    用户名和密码都是默人的guest

3.测试

(1)创建需要的交换器

  • 在RabbitMQ管理界面操作

  • 添加如下三个交换器

(2)添加消息队列

(3)消息队列与交换器的绑定

  • 绑定关系

  • 消息队列要想起作用就要和交换器绑定

  • 在Exchanges中点击需要绑定的交换器

  • 点击Bindings进行绑定

  • 如图所示:每个交换器都要绑定所有的消息队列

  • exchange.topic的绑定规则如下:

发送消息

(1)direct交换器(点对点)

  • 点开exchange.direct

  • 在下面发送消息

  • 因为direct(点对点)交换器是完全匹配路由键的,所以只有一个队列可以收到

(2)fanout交换器(广播)

  • 点开exchange.fanout
  • 发送消息


    可以看到都收到了消息,fanout是广播的(不管路由键是什么,全都发过去)

(3)topic(有选择的广播)

  • 点开exchange.fanout
  • 发送以下消息


可以看到*.news匹配规则的都消息数量都增加了1,所以topic是有选择的广播

二、RabbitMQ的整合

创建一个SpringBoot项目

  • 使用SpringBoot的初始化向导创建一个项目

了解RabbitMQ自动配置

  • 自动配置类(RabbitAutoConfiguration)
  • 1、CachingConnectionFactory:;连接工厂
  • 2、RabbitProperties: 封装了RabbitMQ的所有配置
  • 3、RabbitTemplate:给RabbitMQ发送和接收消息
  • 4、AmqpAdmin:RabbitMQ的系统管理组件

使用RabbitTemplate发送消息

用到的方法

(1)需要自定义消息

/*Message需要自己构造一个;定制消息体内容和消息头*/
        rabbitTemplate.send(exchange,routeKey,message);

(2)直接用对象序列化成消息

  /*object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq*/
        rabbitTemplate.convertAndSend(exchange,routeKey,object);

(3)单播(direct)发送(点对点式)

  • 在测试类中输入以下代码:
@Test
    public void test01(){
        Map map = new HashMap();
        map.put("msg","这是第一个消息");
        map.put("data", Arrays.asList("helloworld",123,true));
        rabbitTemplate.convertAndSend("exchange.direct","atdanqing.news",map);
    }
  • 运行这个测试类,登录的web的管理界面,可以看到新增了一条消息


    这些字节码是通过默认的java序列化后的样子,不直观,我们希望用json的序列化形式展现出来

  • 如何将数据转化为json呢???

  • 在与主程序相同的路径下创建config包,在该包下创建MyAMQPConfig类

  • 在该类添加代码如下:

@Configuration
public class MyAMQPConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  • 运行上面的测试类,同样的方法看数据是否为json数据

(4)接收数据

  • 在测试类写如下代码:
@Test
    public void receive(){
        Object o = rabbitTemplate.receiveAndConvert("atdanqing.news");
        System.out.println(o.getClass());
        System.out.println(o);
    }


可以看到封装的HashMap数据,当我们取出这个数据时,队列当中就会失去这个消息

(5)测试广播(fanout)

  • 在主程序相同的目录下创建bean包,在该包下创建Book类
public class Book {
    private String bookName;
    private String author;

    public Book(String bookName,String author) {
        this.bookName = bookName;
        this.author = author;
    }

    public Book() {
    }

    public String getBookName() {
        return bookName;
    }

    public void setBookName(String bookName) {
        this.bookName = bookName;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }
    @Override
    public String toString() {
        return "Book{" +
                "bookName='" + bookName + '\'' +
                ", author='" + author + '\'' +
                '}';
    }
}
  • 在测试类中写测试广播的方法
@Test
    public void test02(){
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book("三国演义","施耐庵"));
    }

三个消息队列都会收到消息

(6)测试topic

只需要指定相应的路由键就可以了,和上面的类似,这里略过了

@RabbitListener、@EnableRabbit

不需要得到消息头

  • 在与主程序相同的目录下创建一个service包,在该包下创建一个BookService类
@Service
public class BookService {

    @RabbitListener(queues = "atdanqing.news")//指定监听那一个消息队列
    public void receive(Book book){
        System.out.println("收到消息"+book);
    }
}
  • 要想让@RabbitListener起作用,必须要@EnableRabbit支持

  • 在主程序上加上@EnableRabbit注解

  • 启动项目,清除控制台

  • 在测试类中再写一个方法运行

@Test
    public void test03(){
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book("西游记","吴承恩"));
    }

切换到主程序的控制台可以看到

已经监听到了消息

需要得到消息头

这样得到的消息是没有消息头的,如果想要得到消息头用下面的方法

  • 在BookService类里添加如下方法:
    @RabbitListener(queues = "atdanqing")//指定监听那一个消息队列
    public void receive02(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }

启动项目如果atdanqing消息队列里有消息就可以看到如图类似的形式,没有的话可以添加几个消息。

AmqpAdmin管理组件的使用

上面的这些操作都是因为Exchange和Queue都已经创建好了,如果没有创建好就需要在程序中用AmqpAdmin组件去创建。
AmqpAdmin的作用:创建和删除Quence,Exchange

测试AmqpAdmin管理组件的使用

  • 在测试类中添加以下代码
    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange(){
        amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
        System.out.println("创建完成!!!");
    }
  • 运行项目,在web端查看是否创建成功

    可以看到创建成功

  • 注释掉上面创建exchange的部分,添加是创建

amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
  • 运行测试类,查看消息队列是否创建成功

    可以看到创建成功

  • 创建绑定规则

  • 在测试类上注释掉上面的内容

amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));
  • 运行测试类

    可以看到绑定上了

转载:https://blog.csdn.net/DQdanqing/article/details/105861600
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场