小言_互联网的博客

Springboot 整合 RabbitMQ(一)

372人阅读  评论(0)

本篇主要rabbitMq相关的一些简单理论介绍,安装须知,provider消息推送实例,consumer消息消费实例,Direct、Topic、Fanout的使用。

下一篇会讲解消息回调、手动确认等。

安装主要需要几个介质,Linux环境上 需安装erlang、 rabbitmqserver、socat 大家根据版本下载不要一定要安装对应版本否则无法启动。

Erlang 下载地址:

https://www.erlang-solutions.com/resources/download.html

安装完软件后进行配置:

1 配置文件

vim /etc/security/limits.conf 新增

* soft nofile 65535

* hard nofile 65535

 

2.配置配置文件,需要先将文件拷贝到/etc/rabbitmq/ 下然后重新命名一下。启动时会需要扫描该文件,可以在文件里设置ip 监听端口,默认帐号,开启远程访问等等。。。

cd /etc/rabbitmq

cp /usr/share/doc/rabbitmq-server-3.6.12/rabbitmq.config.example /etc/rabbitmq/

mv rabbitmq.config.example rabbitmq.config

设置开机自启动:

chkconfig rabbitmq-server on

3.开启用户远程访问

vi /etc/rabbitmq/rabbitmq.config

 

注意要去掉后面的逗号。改成下图

4.开启web界面管理工具

rabbitmq-plugins enable rabbitmq_management

service rabbitmq-server restart

 

 

5.启停RabbitMQ 服务

service rabbitmq-server start

service rabbitmq-server stop

service rabbitmq-server restart

6.切记开放端口:15672/5672

进如WEBUI界面进行后续配置,默认账户guest/guest

7.添加Virtual Hosts

可以先添加用户

第一个是我安装的rabbitmq版本第二个是erlang版本 3.6只能用erlang1.9.0或以下的版本

添加用户tags 可以设置这个用户的权限。

添加完Virtual Hosts 后可以使用用户何其进行关联

 

在这个页面控制台上我们可以做些什么?
可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。暂时我不截图做讲解等后续大家安装后点击看看基本都能理解。 

 8.交换机种类

  • Direct Exchange 

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 Routing Key 。
然后当一个消息携带着路由值为A,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值A去寻找绑定值也是A的队列。

  • Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

  • Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
介绍下规则:

*  (星号) 用来表示一个单词 (必须出现的)
#  (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 *.AA.*          队列Q2绑定键为 AA.#
如果一条消息携带的路由键为 A.CC.B,那么队列Q1将会收到;
如果一条消息携带的路由键为CC.AA.BB,那么队列Q2将会收到;

主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

除了上述三种还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机。

简单说以下死信交换机,有些公司会使用私信交换机进行消息延迟发送,或者将一些无法消费的消息移动到私信交换机进行再次处理。

9 编码

1 本教程创建2个springboot项目,一个 rabbitmq-provider (生产者),一个rabbitmq-consumer(消费者)。

2 生产者和消费者都添加配置文件application.yml ,根据自己服务器设置的ip端口帐号进行填写即可


  
  1. server:
  2. port: 8021
  3. spring:
  4. #给项目来个名字
  5. application:
  6. name: rabbitmq-provider
  7. #配置rabbitMq 服务器
  8. rabbitmq:
  9. host: 192.168.3.32
  10. port: 5672
  11. username: admin
  12. password: admin
  13. #虚拟host 可以不设置,使用server默认host
  14. #virtual-host: Test

3  pom.xml 添加依赖 生成者和消费者都要添加


  
  1. <!--rabbitmq-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

10 我们先从生产者开始写 首先生产者里创建 RabbitConfig 队列定义和SendMessageController 接口


  
  1. package com.provider.rabbitmqprovider;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * @Title: RabbitMQ
  7. * @Description:
  8. * @author: liaryank
  9. * @Date: 2020/5/14 3:10 下午
  10. * @Version: 1.0
  11. */
  12. @Configuration
  13. public class RabbitConfig {
  14. //绑定键
  15. public final static String liaryank = "topic.liaryank";
  16. public final static String liar = "topic.liar";
  17. //队列 起名:TestDirectQueue
  18. @Bean
  19. public Queue TestDirectQueue() {
  20. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  21. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
  22. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
  23. // return new Queue("TestDirectQueue",true,true,false);
  24. //一般设置一下队列的持久化就好,其余两个就是默认false
  25. return new Queue( "TestDirectQueue", true);
  26. }
  27. //Direct交换机 起名:TestDirectExchange
  28. @Bean
  29. DirectExchange TestDirectExchange() {
  30. // return new DirectExchange("TestDirectExchange",true,true);
  31. return new DirectExchange( "TestDirectExchange", true, false);
  32. }
  33. //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  34. @Bean
  35. Binding bindingDirect() {
  36. Binding testDirectRouting = BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with( "TestDirectRouting");
  37. return testDirectRouting;
  38. }
  39. @Bean
  40. DirectExchange lonelyDirectExchange() {
  41. return new DirectExchange( "lonelyDirectExchange");
  42. }
  43. // topic 类型
  44. @Bean
  45. public Queue firstQueue() {
  46. return new Queue(RabbitConfig.liaryank);
  47. }
  48. @Bean
  49. public Queue secondQueue() {
  50. return new Queue(RabbitConfig.liar);
  51. }
  52. @Bean
  53. TopicExchange exchange() {
  54. return new TopicExchange( "topicExchange");
  55. }
  56. //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
  57. //这样只要是消息携带的路由键是topic.man,才会分发到该队列
  58. @Bean
  59. Binding bindingExchangeMessage() {
  60. return BindingBuilder.bind(firstQueue()).to(exchange()).with(liaryank);
  61. }
  62. //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
  63. // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
  64. @Bean
  65. Binding bindingExchangeMessage2() {
  66. return BindingBuilder.bind(secondQueue()).to(exchange()).with( "topic.#");
  67. }
  68. /**
  69. * 创建三个队列 :fanout.A fanout.B fanout.C
  70. * 将三个队列都绑定在交换机 fanoutExchange 上
  71. * 因为是扇型交换机, 路由键无需配置,配置也不起作用
  72. */
  73. @Bean
  74. public Queue queueA() {
  75. return new Queue( "fanout.A");
  76. }
  77. @Bean
  78. public Queue queueB() {
  79. return new Queue( "fanout.B");
  80. }
  81. @Bean
  82. public Queue queueC() {
  83. return new Queue( "fanout.C");
  84. }
  85. @Bean
  86. FanoutExchange fanoutExchange() {
  87. return new FanoutExchange( "fanoutExchange");
  88. }
  89. @Bean
  90. Binding bindingExchangeA() {
  91. return BindingBuilder.bind(queueA()).to(fanoutExchange());
  92. }
  93. @Bean
  94. Binding bindingExchangeB() {
  95. return BindingBuilder.bind(queueB()).to(fanoutExchange());
  96. }
  97. @Bean
  98. Binding bindingExchangeC() {
  99. return BindingBuilder.bind(queueC()).to(fanoutExchange());
  100. }
  101. }

 


  
  1. package com.provider.rabbitmqprovider;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import java.time.LocalDateTime;
  7. import java.time.format.DateTimeFormatter;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. import java.util.UUID;
  11. /**
  12. * @Title: RabbitMQ
  13. * @Description:
  14. * @author: liaryank
  15. * @Date: 2020/5/14 3:17 下午
  16. * @Version: 1.0
  17. */
  18. @RestController
  19. public class SendMessageController {
  20. @Autowired
  21. RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
  22. //Direct类型
  23. @GetMapping( "/sendDirectMessage")
  24. public String sendDirectMessage() {
  25. String messageId = String.valueOf(UUID.randomUUID());
  26. String messageData = "message is hello!";
  27. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"));
  28. Map<String,Object> map= new HashMap<>();
  29. map.put( "messageId",messageId);
  30. map.put( "messageData",messageData);
  31. map.put( "createTime",createTime);
  32. //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
  33. rabbitTemplate.convertAndSend( "TestDirectExchange", "TestDirectRouting", map);
  34. return "ok";
  35. }
  36. //Topic类型
  37. @GetMapping( "/sendTopicMessage1")
  38. public String sendTopicMessage1() {
  39. String messageId = String.valueOf(UUID.randomUUID());
  40. String messageData = "message: liaryank~~~ ";
  41. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"));
  42. Map<String, Object> manMap = new HashMap<>();
  43. manMap.put( "messageId", messageId);
  44. manMap.put( "messageData", messageData);
  45. manMap.put( "createTime", createTime);
  46. rabbitTemplate.convertAndSend( "topicExchange", "topic.liaryank", manMap);
  47. return "ok";
  48. }
  49. @GetMapping( "/sendTopicMessage2")
  50. public String sendTopicMessage2() {
  51. String messageId = String.valueOf(UUID.randomUUID());
  52. String messageData = "message: msg is all ";
  53. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"));
  54. Map<String, Object> womanMap = new HashMap<>();
  55. womanMap.put( "messageId", messageId);
  56. womanMap.put( "messageData", messageData);
  57. womanMap.put( "createTime", createTime);
  58. rabbitTemplate.convertAndSend( "topicExchange", "topic.liar", womanMap);
  59. return "ok";
  60. }
  61. //Fanout类型
  62. @GetMapping( "/sendFanoutMessage")
  63. public String sendFanoutMessage() {
  64. String messageId = String.valueOf(UUID.randomUUID());
  65. String messageData = "message: testFanoutMessage ";
  66. String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"));
  67. Map<String, Object> map = new HashMap<>();
  68. map.put( "messageId", messageId);
  69. map.put( "messageData", messageData);
  70. map.put( "createTime", createTime);
  71. rabbitTemplate.convertAndSend( "fanoutExchange", null, map);
  72. return "ok";
  73. }
  74. }

 

生产者和接口就定义完成我们来继续写消费者

11 三种类型交换机和其对应的消费 

12  Direct类型


  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @Title: RabbitMQ
  10. * @Description:
  11. * @author: liaryank
  12. * @Date: 2020/5/14 4:01 下午
  13. * @Version: 1.0
  14. */
  15. @Configuration
  16. public class DirectRabbitConfig {
  17. //队列 起名:TestDirectQueue
  18. @Bean
  19. public Queue TestDirectQueue() {
  20. return new Queue( "TestDirectQueue", true);
  21. }
  22. //Direct交换机 起名:TestDirectExchange
  23. @Bean
  24. DirectExchange TestDirectExchange() {
  25. return new DirectExchange( "TestDirectExchange");
  26. }
  27. //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
  28. @Bean
  29. Binding bindingDirect() {
  30. return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with( "TestDirectRouting");
  31. }
  32. }

  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * @Title: RabbitMQ
  8. * @Description:
  9. * @author: liaryank
  10. * @Date: 2020/5/14 4:03 下午
  11. * @Version: 1.0
  12. */
  13. @Component
  14. @RabbitListener(queues = "TestDirectQueue") //监听的队列名称 TestDirectQueue
  15. public class DirectReceiver {
  16. @RabbitHandler
  17. public void process(Map testMessage) {
  18. System.out.println( "DirectReceiver消费者收到消息 : " + testMessage.toString());
  19. }
  20. }

13 FanoutRabbit类型


  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @Title: RabbitMQ
  10. * @Description:
  11. * @author: liaryank
  12. * @Date: 2020/5/14 4:38 下午
  13. * @Version: 1.0
  14. */
  15. @Configuration
  16. public class FanoutRabbitConfig {
  17. /**
  18. * 创建三个队列 :fanout.A fanout.B fanout.C
  19. * 将三个队列都绑定在交换机 fanoutExchange 上
  20. * 因为是扇型交换机, 路由键无需配置,配置也不起作用
  21. */
  22. @Bean
  23. public Queue queueA() {
  24. return new Queue( "fanout.A");
  25. }
  26. @Bean
  27. public Queue queueB() {
  28. return new Queue( "fanout.B");
  29. }
  30. @Bean
  31. public Queue queueC() {
  32. return new Queue( "fanout.C");
  33. }
  34. @Bean
  35. FanoutExchange fanoutExchange() {
  36. return new FanoutExchange( "fanoutExchange");
  37. }
  38. @Bean
  39. Binding bindingExchangeA() {
  40. return BindingBuilder.bind(queueA()).to(fanoutExchange());
  41. }
  42. @Bean
  43. Binding bindingExchangeB() {
  44. return BindingBuilder.bind(queueB()).to(fanoutExchange());
  45. }
  46. @Bean
  47. Binding bindingExchangeC() {
  48. return BindingBuilder.bind(queueC()).to(fanoutExchange());
  49. }
  50. }

 


  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * @Title: RabbitMQ
  8. * @Description:
  9. * @author: liaryank
  10. * @Date: 2020/5/14 4:36 下午
  11. * @Version: 1.0
  12. */
  13. @Component
  14. @RabbitListener(queues = "fanout.A")
  15. public class FanoutReceiverA {
  16. @RabbitHandler
  17. public void process(Map testMessage) {
  18. System.out.println( "FanoutReceiverA消费者收到消息 : " +testMessage.toString());
  19. }
  20. }

  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * @Title: RabbitMQ
  8. * @Description:
  9. * @author: liaryank
  10. * @Date: 2020/5/14 4:36 下午
  11. * @Version: 1.0
  12. */
  13. @Component
  14. @RabbitListener(queues = "fanout.B")
  15. public class FanoutReceiverB {
  16. @RabbitHandler
  17. public void process(Map testMessage) {
  18. System.out.println( "FanoutReceiverB消费者收到消息 : " +testMessage.toString());
  19. }
  20. }

  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * @Title: RabbitMQ
  8. * @Description:
  9. * @author: liaryank
  10. * @Date: 2020/5/14 4:36 下午
  11. * @Version: 1.0
  12. */
  13. @Component
  14. @RabbitListener(queues = "fanout.C")
  15. public class FanoutReceiverC {
  16. @RabbitHandler
  17. public void process(Map testMessage) {
  18. System.out.println( "FanoutReceiverC消费者收到消息 : " +testMessage.toString());
  19. }
  20. }

 

14 Topic类型


  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * @Title: RabbitMQ
  8. * @Description: topic
  9. * @author: liaryank
  10. * @Date: 2020/5/14 4:25 下午
  11. * @Version: 1.0
  12. */
  13. @Component
  14. @RabbitListener(queues = "topic.liaryank")
  15. public class TopicManReceiver {
  16. @RabbitHandler
  17. public void process(Map testMessage) {
  18. System.out.println( "TopicManReceiver消费者收到消息 : " + testMessage.toString());
  19. }
  20. }

  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @Title: RabbitMQ
  10. * @Description:
  11. * @author: liaryank
  12. * @Date: 2020/5/14 4:27 下午
  13. * @Version: 1.0
  14. */
  15. @Configuration
  16. public class TopicRabbitConfig {
  17. //绑定键
  18. public final static String liaryank = "topic.liaryank";
  19. public final static String liar = "topic.liar";
  20. @Bean
  21. public Queue firstQueue() {
  22. return new Queue(TopicRabbitConfig.liaryank);
  23. }
  24. @Bean
  25. public Queue secondQueue() {
  26. return new Queue(TopicRabbitConfig.liar);
  27. }
  28. @Bean
  29. TopicExchange exchange() {
  30. return new TopicExchange( "topicExchange");
  31. }
  32. //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
  33. //这样只要是消息携带的路由键是topic.man,才会分发到该队列
  34. @Bean
  35. Binding bindingExchangeMessage() {
  36. return BindingBuilder.bind(firstQueue()).to(exchange()).with(liaryank);
  37. }
  38. //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
  39. // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
  40. @Bean
  41. Binding bindingExchangeMessage2() {
  42. return BindingBuilder.bind(secondQueue()).to(exchange()).with( "topic.#");
  43. }
  44. }

  
  1. package com.consumer.rabbitmqconsumer;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * @Title: RabbitMQ
  8. * @Description:
  9. * @author: liaryank
  10. * @Date: 2020/5/14 4:26 下午
  11. * @Version: 1.0
  12. */
  13. @Component
  14. @RabbitListener(queues = "topic.liar")
  15. public class TopicTotalReceiver {
  16. @RabbitHandler
  17. public void process(Map testMessage) {
  18. System.out.println( "TopicTotalReceiver消费者收到消息 : " + testMessage.toString());
  19. }
  20. }

 

ok到这里我们生产者和消费者就写完了,下面我们来测试调用看看

首先启动生产者和消费者两个项目,然后postman 调用 

均启动

15 Direct类型调用

 消费者输出结果

 

16 Topic类型测试 调用message1 和message2 

 

 

17 Fanout类型测试

 

 

ok 到此三种类型交换机简单收发消息就完成了, 消息回调和手动签收大家可以等我后面发帖,也可自行先百度解决。

 

 

 

 

 

 

 

 

 


转载:https://blog.csdn.net/Liaryank/article/details/106181089
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场