飞道的博客

RabbitMQ交换机

298人阅读  评论(0)

1. Exchange作用

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。

生产者--(创建消息)-->交换机--(路由键)-->队列--(pull/push)-->消费者

2. Exchange的类型

1)直连交换器: Direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的

什么是路由键?
每个消息都有一个称为路由键(routing key)的属性,它其实就是一个简单的字符串

直连交换机适用场景?
有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

直连交换机不适合的场景
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么直连交换机就不合适了

2)主题交换机: Topic Exchange(发布/订阅)
RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

主题交换机的routing_key定义规则:
交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:
*表示一个单词
#表示任意数量(零个或多个)单词

示例:


  
  1. Q1: *.TT.*
  2. Q2: TT.#
  3. 如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到
  4. 如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到

3)广播交换机: Fanout Exchange
用于广播消息,将发送到Exchange中的消息发送到与该交换器关联的所有队列中。

3. 死信队列

死信队列用于存储没匹配队列的消息,超时没有被处理的消息,如果没有配置死信队列这些消息会被丢弃。即当出现没有匹配的队列的消息,或是超时的消息则将消息转入到死信队列里去,等待重新处理或人工干预。

死信队列的应用场景:

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

4. 队列参数说明

参数 作用
exchange 交换机名称
type 交换机类型
durable 是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
autoDelete 是否自动删除,如果没有与之绑定的Queue,直接删除
internal 是否内置的,如果为true,只能通过Exchange到Exchange
arguments 结构化参数

示例:


  
  1. Exchange. DeclareOk exchangeDeclare( String exchange,
  2. String type,
  3. boolean durable,
  4. boolean autoDelete,
  5. boolean internal,
  6. Map< String, Object> arguments) throws IOException;

5. 开发示例

准配虚拟机 开启一个Docker 拉取镜像rabbitmq 运行容器

具体步骤:有道云笔记

需要架包


  
  1. <dependency>
  2. <groupId>org.springframework.boot </groupId>
  3. <artifactId>spring-boot-starter-amqp </artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot </groupId>
  7. <artifactId>spring-boot-starter-web </artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.projectlombok </groupId>
  11. <artifactId>lombok </artifactId>
  12. <optional>true </optional>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.springframework.boot </groupId>
  16. <artifactId>spring-boot-starter-test </artifactId>
  17. <scope>test </scope>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.amqp </groupId>
  21. <artifactId>spring-rabbit-test </artifactId>
  22. <scope>test </scope>
  23. </dependency>

配置文件 application 生产与消费者都可用 端口需要改动 还有RabbitMQ服务地址需要改动


  
  1. server.port= 8081
  2. ## rabbitmq config
  3. spring.rabbitmq.host= 192.168. 164.128
  4. spring.rabbitmq.port= 5672
  5. spring.rabbitmq.username=xhz
  6. spring.rabbitmq.password= 123
  7. spring.rabbitmq.virtual-host=my_vhost
  8. ## 消费者数量
  9. spring.rabbitmq.listener.simple.concurrency= 10
  10. spring.rabbitmq.listener.simple.max-concurrency= 10
  11. #消费者每次从队列中获取的消息数量
  12. spring.rabbitmq.listener.simple.prefetch= 1
  13. #消费者自动启动
  14. spring.rabbitmq.listener.simple.auto-startup= true
  15. #消费失败,自动重新入队
  16. spring.rabbitmq.listener.simple.default-requeue-rejected= true
  17. #启用发送重试
  18. spring.rabbitmq.template. retry.enabled= true
  19. spring.rabbitmq.template. retry.initial-interval= 1000
  20. spring.rabbitmq.template. retry.max-attempts= 3
  21. spring.rabbitmq.template. retry.max-interval= 10000
  22. spring.rabbitmq.template. retry.multiplier= 1.0

完成演示图

所有消费消息类 模块

5.1 Direct交换机

1)配置直接交换机,队列,并将直接交换机和该队列绑定。(在RabbitMQConfig类中配置,该类使用了@Configuration注解)


  
  1. package com. rabbitmq. provider. rabbitmqprovider. config;
  2. import org. springframework. amqp. core. Binding;
  3. import org. springframework. amqp. core. BindingBuilder;
  4. import org. springframework. amqp. core. DirectExchange;
  5. import org. springframework. amqp. core. Queue;
  6. import org. springframework. context. annotation. Bean;
  7. import org. springframework. context. annotation. Configuration;
  8. @Configuration
  9. public class DirectConfig {
  10. @Bean
  11. public Queue directQueue( ){
  12. return new Queue( "zking-direct-queue");
  13. }
  14. @Bean
  15. public DirectExchange directExchange( ){
  16. return new DirectExchange( "zking-direct-exchange");
  17. }
  18. @Bean
  19. public Binding directBinding( ){
  20. return BindingBuilder. bind( directQueue()). to( directExchange()). with( "zking-direc");
  21. }
  22. }

2)编写通过直接交换机发送消息的方法


  
  1. package com.rabbitmq.provider.rabbitmqprovider.web;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import java.time.LocalDateTime;
  7. import java.time.format.DateTimeFormatter;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @RestController
  11. public class SenderController {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. @RequestMapping("/sendDirect")
  15. public String sendDirect (String routing){
  16. Map msg= new HashMap<>();
  17. msg.put( "code", 200);
  18. msg.put( "msg", "this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
  19. .ofPattern( "yyyy-MM-dd HH:mm:ss")));
  20. rabbitTemplate.convertAndSend( "zking-direct-exchange",routing,msg);
  21. return "direct success";
  22. }
  23. }

3.测试交换机发送消息

http://localhost:8081/sendDirect?routing=zking-direc

4.消费消息

创建模块

消费消息 我们运行这个项目


  
  1. package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit. annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit. annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.Map;
  7. @Component
  8. @Slf4j
  9. //queues参数指定的是与直接交换机关联的队列名称
  10. @RabbitListener(queues = "zking-direct-queue")
  11. public class DirecReciewer {
  12. @RabbitHandler
  13. public void receive(Map msg) {
  14. log.info( "接收通过直接交换机发送的消息: " + msg);
  15. }
  16. }

打印结果

5.2 主题交换机

1) 配置主题交换机,队列,并将主题交换机和该队列绑定。

第一种方式 ---选一种即可


  
  1. package com.rabbitmq.provider.rabbitmqprovider.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory. annotation.Qualifier;
  4. import org.springframework.context. annotation.Bean;
  5. import org.springframework.context. annotation.Configuration;
  6. //@Configuration
  7. public class TopicConfig {
  8. /**
  9. * 声明Topic类型的交换机,支持序列化,后面队列进行绑定(topic_queue_q1,topic_queue_q2)
  10. * @return
  11. */
  12. @Bean(name="topicExchange")
  13. public Exchange topicExchange() {
  14. return ExchangeBuilder
  15. .topicExchange( "topic_exchange")
  16. .durable( true)
  17. .build();
  18. }
  19. /**
  20. * 声明队列,该队列与topic交换机绑定
  21. * @return
  22. */
  23. @Bean(name="topicQueue1")
  24. public Queue topicQueue1() {
  25. return QueueBuilder.durable( "topic_queue_q1").build();
  26. }
  27. /**
  28. * 声明队列,该队列与topic交换机绑定
  29. * @return
  30. */
  31. @Bean(name="topicQueue2")
  32. public Queue topicQueue2() {
  33. return QueueBuilder.durable( "topic_queue_q2").build();
  34. }
  35. /**
  36. * 将队列(topic_queue_q1)与topic型交换机进行绑定
  37. * @param queue
  38. * @param exchange
  39. * @return
  40. */
  41. @Bean
  42. public Binding topicBindingQ1(
  43. @Qualifier("topicQueue1") Queue queue,
  44. @Qualifier("topicExchange") Exchange exchange) {
  45. return BindingBuilder
  46. .bind(queue)
  47. .to(exchange)
  48. .with( "topic.queue.#")
  49. .noargs();
  50. }
  51. /**
  52. * 将队列(topic_queue_q2)与topic型交换机进行绑定
  53. * @param queue
  54. * @param exchange
  55. * @return
  56. */
  57. @Bean
  58. public Binding topicBindingQ2(
  59. @Qualifier("topicQueue2") Queue queue,
  60. @Qualifier("topicExchange") Exchange exchange) {
  61. return BindingBuilder
  62. .bind(queue)
  63. .to(exchange)
  64. .with( "topic.queue.#")
  65. .noargs();
  66. }
  67. }

 测试 发送消息


  
  1. package com.rabbitmq.provider.rabbitmqprovider;
  2. import org.junit.jupiter.api.Test;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import java.time.LocalDateTime;
  7. import java.time.format.DateTimeFormatter;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @SpringBootTest
  11. class RabbitmqProviderApplicationTests {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. @Test
  15. void contextLoads () {
  16. Map msg= new HashMap<>();
  17. msg.put( "code", 200);
  18. msg.put( "msg", "this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
  19. .ofPattern( "yyyy-MM-dd HH:mm:ss")));
  20. rabbitTemplate.convertAndSend( "topic_exchange", "topic.queue.ab",msg);
  21. }
  22. }

第二种发送消息


  
  1. package com .rabbitmq .provider .rabbitmqprovider .config;
  2. import org .springframework .amqp .core.*;
  3. import org .springframework .beans .factory .annotation .Qualifier;
  4. import org .springframework .context .annotation .Bean;
  5. import org .springframework .context .annotation .Configuration;
  6. @ Configuration
  7. public class TopicConfig1 {
  8. @Bean(name= "topicQueue1")
  9. public Queue topicQueue1() {
  10. return QueueBuilder .durable( "topic_queue_q1") .build();
  11. }
  12. @ Bean(name= "topicQueue2")
  13. public Queue topicQueue2() {
  14. return QueueBuilder .durable( "topic_queue_q2") .build();
  15. }
  16. @ Bean
  17. public TopicExchange topicExchange(){
  18. return new TopicExchange( "topic-exchange");
  19. }
  20. @ Bean
  21. public Binding topicBinding1( @Qualifier( "topicQueue1") Queue queue,
  22. @Qualifier( "topicExchange") TopicExchange exchange){
  23. return BindingBuilder .bind(queue) .to(exchange) .with( "person.yy");
  24. }
  25. @ Bean
  26. public Binding topicBinding2( @Qualifier( "topicQueue2") Queue queue,
  27. @Qualifier( "topicExchange") TopicExchange exchange){
  28. return BindingBuilder .bind(queue) .to(exchange) .with( "person.*");
  29. }
  30. }

测试发送消息


  
  1. package com.rabbitmq.provider.rabbitmqprovider.web;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import java.time.LocalDateTime;
  7. import java.time.format.DateTimeFormatter;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @RestController
  11. public class SenderController {
  12. @RequestMapping("/sendTopic")
  13. public String sendTopic (String routing){
  14. Map msg= new HashMap<>();
  15. msg.put( "code", 200);
  16. msg.put( "msg", "this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
  17. .ofPattern( "yyyy-MM-dd HH:mm:ss")));
  18. rabbitTemplate.convertAndSend( "topic-exchange",routing,msg);
  19. return "direct success";
  20. }
  21. }

http://localhost:8081/sendTopic?routing=person.y 只有条件为 y 或者 *

http://localhost:8081/sendTopic?routing=person.yy

消费消息


  
  1. package com. zking. rabbitmq. consumer. rabbitmqconsumer. component;
  2. import org. springframework. amqp. rabbit. annotation. RabbitHandler;
  3. import org. springframework. amqp. rabbit. annotation. RabbitListener;
  4. import org. springframework. stereotype. Component;
  5. import java. util. Map;
  6. @Component
  7. public class TopicReciewer {
  8. @RabbitListener(queues={ "topic_queue_q1"})
  9. @RabbitHandler
  10. public void handler( Map map){
  11. System. out. println(map);
  12. }
  13. @RabbitListener(queues={ "topic_queue_q2"})
  14. @RabbitHandler
  15. public void handler1( Map map){
  16. System. out. println(map);
  17. }
  18. }

 

 

5.3 广播交换机 (扇形)

1)配置广播交换机,队列,并将主题交换机和该队列绑定。


  
  1. package com .rabbitmq .provider .rabbitmqprovider .config;
  2. import org .springframework .amqp .core.*;
  3. import org .springframework .beans .factory .annotation .Qualifier;
  4. import org .springframework .context .annotation .Bean;
  5. import org .springframework .context .annotation .Configuration;
  6. @ Configuration
  7. public class FanoutConfig {
  8. @Bean
  9. public Queue fanoutQueue1() {
  10. return new Queue( "fanout-queue1");
  11. }
  12. @ Bean
  13. public Queue fanoutQueue2() {
  14. return new Queue( "fanout-queue2");
  15. }
  16. @ Bean
  17. public Queue fanoutQueue3() {
  18. return new Queue( "fanout-queue3");
  19. }
  20. @ Bean
  21. public FanoutExchange fanoutExchange() {
  22. return new FanoutExchange( "fanout-exchange");
  23. }
  24. @ Bean
  25. public Binding fanoutBInding1( @Qualifier( "fanoutQueue1") Queue queue,
  26. @Qualifier( "fanoutExchange") FanoutExchange exchange) {
  27. return BindingBuilder .bind(queue) .to(exchange);
  28. }
  29. @ Bean
  30. public Binding fanoutBInding2( @Qualifier( "fanoutQueue2") Queue queue,
  31. @Qualifier( "fanoutExchange") FanoutExchange exchange) {
  32. return BindingBuilder .bind(queue) .to(exchange);
  33. }
  34. @ Bean
  35. public Binding fanoutBInding3( @Qualifier( "fanoutQueue3") Queue queue,
  36. @Qualifier( "fanoutExchange") FanoutExchange exchange) {
  37. return BindingBuilder .bind(queue) .to(exchange);
  38. }
  39. }

 

生产的队列

向服务器发送消息


  
  1. package com.rabbitmq.provider.rabbitmqprovider.web;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import java.time.LocalDateTime;
  7. import java.time.format.DateTimeFormatter;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @RestController
  11. public class SenderController {
  12. @Autowired
  13. private RabbitTemplate rabbitTemplate;
  14. @RequestMapping("/sendFanout")
  15. public String sendFanout (){
  16. Map msg= new HashMap<>();
  17. msg.put( "code", 200);
  18. msg.put( "msg", "this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
  19. .ofPattern( "yyyy-MM-dd HH:mm:ss")));
  20. rabbitTemplate.convertAndSend( "fanout-exchange", null,msg);
  21. return "direct success";
  22. }
  23. }

 

发送与消费

消费消息


  
  1. package com. zking. rabbitmq. consumer. rabbitmqconsumer. component;
  2. import org. springframework. amqp. rabbit. annotation. RabbitHandler;
  3. import org. springframework. amqp. rabbit. annotation. RabbitListener;
  4. import org. springframework. stereotype. Component;
  5. import java. util. Map;
  6. @Component
  7. public class FanoutRecevier {
  8. @RabbitListener(queues={ "fanout-queue1"})
  9. @RabbitHandler
  10. public void fanout( Map map){
  11. System. out. println(map);
  12. }
  13. @RabbitListener(queues={ "fanout-queue2"})
  14. @RabbitHandler
  15. public void fanout1( Map map){
  16. System. out. println(map);
  17. }
  18. @RabbitListener(queues={ "fanout-queue3"})
  19. @RabbitHandler
  20. public void fanout2( Map map){
  21. System. out. println(map);
  22. }
  23. }

 

 


转载:https://blog.csdn.net/qq_62898618/article/details/128474747
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场