1. Exchange作用
在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。
生产者--(创建消息)-->交换机--(路由键)-->队列--(pull/push)-->消费者
2. Exchange的类型
1)直连交换器: Direct Exchange
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key,当消息被发送的时候,需要指定一个binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个binding_key也是支持应用到多个队列中的
什么是路由键?
每个消息都有一个称为路由键(routing key)的属性,它其实就是一个简单的字符串
直连交换机适用场景?
有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
直连交换机不适合的场景
直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,那么直连交换机就不合适了
2)主题交换机: Topic Exchange(发布/订阅)
RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
主题交换机的routing_key定义规则:
交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开,其中:
*表示一个单词
#表示任意数量(零个或多个)单词
示例:
-
Q1: *.TT.*
-
Q2: TT.#
-
-
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到
-
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到
3)广播交换机: Fanout Exchange
用于广播消息,将发送到Exchange中的消息发送到与该交换器关联的所有队列中。
3. 死信队列
死信队列用于存储没匹配队列的消息,超时没有被处理的消息,如果没有配置死信队列这些消息会被丢弃。即当出现没有匹配的队列的消息,或是超时的消息则将消息转入到死信队列里去,等待重新处理或人工干预。
死信队列的应用场景:
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
4. 队列参数说明
参数 | 作用 |
---|---|
exchange | 交换机名称 |
type | 交换机类型 |
durable | 是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除 |
autoDelete | 是否自动删除,如果没有与之绑定的Queue,直接删除 |
internal | 是否内置的,如果为true,只能通过Exchange到Exchange |
arguments | 结构化参数 |
示例:
-
Exchange.
DeclareOk
exchangeDeclare(
String exchange,
-
String
type,
-
boolean durable,
-
boolean autoDelete,
-
boolean internal,
-
Map<
String,
Object>
arguments) throws
IOException;
5. 开发示例
准配虚拟机 开启一个Docker 拉取镜像rabbitmq 运行容器
具体步骤:有道云笔记
需要架包
-
<dependency>
-
<groupId>org.springframework.boot
</groupId>
-
<artifactId>spring-boot-starter-amqp
</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot
</groupId>
-
<artifactId>spring-boot-starter-web
</artifactId>
-
</dependency>
-
-
<dependency>
-
<groupId>org.projectlombok
</groupId>
-
<artifactId>lombok
</artifactId>
-
<optional>true
</optional>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot
</groupId>
-
<artifactId>spring-boot-starter-test
</artifactId>
-
<scope>test
</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.amqp
</groupId>
-
<artifactId>spring-rabbit-test
</artifactId>
-
<scope>test
</scope>
-
</dependency>
配置文件 application 生产与消费者都可用 端口需要改动 还有RabbitMQ服务地址需要改动
-
-
server.port=
8081
-
## rabbitmq config
-
spring.rabbitmq.host=
192.168.
164.128
-
spring.rabbitmq.port=
5672
-
spring.rabbitmq.username=xhz
-
spring.rabbitmq.password=
123
-
spring.rabbitmq.virtual-host=my_vhost
-
## 消费者数量
-
spring.rabbitmq.listener.simple.concurrency=
10
-
spring.rabbitmq.listener.simple.max-concurrency=
10
-
#消费者每次从队列中获取的消息数量
-
spring.rabbitmq.listener.simple.prefetch=
1
-
#消费者自动启动
-
spring.rabbitmq.listener.simple.auto-startup=
true
-
#消费失败,自动重新入队
-
spring.rabbitmq.listener.simple.default-requeue-rejected=
true
-
#启用发送重试
-
spring.rabbitmq.template.
retry.enabled=
true
-
spring.rabbitmq.template.
retry.initial-interval=
1000
-
spring.rabbitmq.template.
retry.max-attempts=
3
-
spring.rabbitmq.template.
retry.max-interval=
10000
-
spring.rabbitmq.template.
retry.multiplier=
1.0
完成演示图
所有消费消息类 模块
5.1 Direct交换机
1)配置直接交换机,队列,并将直接交换机和该队列绑定。(在RabbitMQConfig类中配置,该类使用了@Configuration注解)
-
package com.
rabbitmq.
provider.
rabbitmqprovider.
config;
-
-
import org.
springframework.
amqp.
core.
Binding;
-
import org.
springframework.
amqp.
core.
BindingBuilder;
-
import org.
springframework.
amqp.
core.
DirectExchange;
-
import org.
springframework.
amqp.
core.
Queue;
-
import org.
springframework.
context.
annotation.
Bean;
-
import org.
springframework.
context.
annotation.
Configuration;
-
-
-
-
-
@Configuration
-
public
class
DirectConfig {
-
@Bean
-
public
Queue
directQueue(
){
-
return
new
Queue(
"zking-direct-queue");
-
}
-
@Bean
-
public
DirectExchange
directExchange(
){
-
return
new
DirectExchange(
"zking-direct-exchange");
-
}
-
-
@Bean
-
public
Binding
directBinding(
){
-
return
BindingBuilder.
bind(
directQueue()).
to(
directExchange()).
with(
"zking-direc");
-
}
-
-
}
2)编写通过直接交换机发送消息的方法
-
package com.rabbitmq.provider.rabbitmqprovider.web;
-
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.web.bind.annotation.RequestMapping;
-
import org.springframework.web.bind.annotation.RestController;
-
-
import java.time.LocalDateTime;
-
import java.time.format.DateTimeFormatter;
-
import java.util.HashMap;
-
import java.util.Map;
-
-
@RestController
-
public
class
SenderController {
-
-
@Autowired
-
private RabbitTemplate rabbitTemplate;
-
@RequestMapping("/sendDirect")
-
public String
sendDirect
(String routing){
-
Map msg=
new
HashMap<>();
-
msg.put(
"code",
200);
-
msg.put(
"msg",
"this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
-
.ofPattern(
"yyyy-MM-dd HH:mm:ss")));
-
rabbitTemplate.convertAndSend(
"zking-direct-exchange",routing,msg);
-
return
"direct success";
-
}
-
-
}
3.测试交换机发送消息
http://localhost:8081/sendDirect?routing=zking-direc
4.消费消息
创建模块
消费消息 我们运行这个项目
-
package com.zking.rabbitmq.consumer.rabbitmqconsumer.component;
-
-
import lombok.extern.slf4j.Slf4j;
-
import org.springframework.amqp.rabbit.
annotation.RabbitHandler;
-
import org.springframework.amqp.rabbit.
annotation.RabbitListener;
-
import org.springframework.stereotype.Component;
-
-
import java.util.Map;
-
-
@Component
-
@Slf4j
-
//queues参数指定的是与直接交换机关联的队列名称
-
@RabbitListener(queues = "zking-direct-queue")
-
public
class
DirecReciewer {
-
-
@RabbitHandler
-
public void receive(Map msg) {
-
log.info(
"接收通过直接交换机发送的消息: " + msg);
-
}
-
}
打印结果
5.2 主题交换机
1) 配置主题交换机,队列,并将主题交换机和该队列绑定。
第一种方式 ---选一种即可
-
package com.rabbitmq.provider.rabbitmqprovider.config;
-
-
import org.springframework.amqp.core.*;
-
import org.springframework.beans.factory.
annotation.Qualifier;
-
import org.springframework.context.
annotation.Bean;
-
import org.springframework.context.
annotation.Configuration;
-
-
//@Configuration
-
public
class
TopicConfig {
-
/**
-
* 声明Topic类型的交换机,支持序列化,后面队列进行绑定(topic_queue_q1,topic_queue_q2)
-
* @return
-
*/
-
@Bean(name="topicExchange")
-
public Exchange topicExchange() {
-
-
return ExchangeBuilder
-
.topicExchange(
"topic_exchange")
-
.durable(
true)
-
.build();
-
}
-
-
-
/**
-
* 声明队列,该队列与topic交换机绑定
-
* @return
-
*/
-
@Bean(name="topicQueue1")
-
public Queue topicQueue1() {
-
return QueueBuilder.durable(
"topic_queue_q1").build();
-
}
-
-
-
/**
-
* 声明队列,该队列与topic交换机绑定
-
* @return
-
*/
-
@Bean(name="topicQueue2")
-
public Queue topicQueue2() {
-
return QueueBuilder.durable(
"topic_queue_q2").build();
-
}
-
-
-
/**
-
* 将队列(topic_queue_q1)与topic型交换机进行绑定
-
* @param queue
-
* @param exchange
-
* @return
-
*/
-
@Bean
-
public Binding topicBindingQ1(
-
@Qualifier("topicQueue1") Queue queue,
-
@Qualifier("topicExchange") Exchange exchange) {
-
-
return BindingBuilder
-
.bind(queue)
-
.to(exchange)
-
.with(
"topic.queue.#")
-
.noargs();
-
}
-
-
-
/**
-
* 将队列(topic_queue_q2)与topic型交换机进行绑定
-
* @param queue
-
* @param exchange
-
* @return
-
*/
-
@Bean
-
public Binding topicBindingQ2(
-
@Qualifier("topicQueue2") Queue queue,
-
@Qualifier("topicExchange") Exchange exchange) {
-
-
return BindingBuilder
-
.bind(queue)
-
.to(exchange)
-
.with(
"topic.queue.#")
-
.noargs();
-
}
-
}
测试 发送消息
-
package com.rabbitmq.provider.rabbitmqprovider;
-
-
import org.junit.jupiter.api.Test;
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.boot.test.context.SpringBootTest;
-
-
import java.time.LocalDateTime;
-
import java.time.format.DateTimeFormatter;
-
import java.util.HashMap;
-
import java.util.Map;
-
-
@SpringBootTest
-
class
RabbitmqProviderApplicationTests {
-
-
@Autowired
-
private RabbitTemplate rabbitTemplate;
-
@Test
-
void
contextLoads
() {
-
Map msg=
new
HashMap<>();
-
msg.put(
"code",
200);
-
msg.put(
"msg",
"this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
-
.ofPattern(
"yyyy-MM-dd HH:mm:ss")));
-
rabbitTemplate.convertAndSend(
"topic_exchange",
"topic.queue.ab",msg);
-
}
-
-
}
第二种发送消息
-
package
com
.rabbitmq
.provider
.rabbitmqprovider
.config;
-
-
import
org
.springframework
.amqp
.core.*;
-
import
org
.springframework
.beans
.factory
.annotation
.Qualifier;
-
import
org
.springframework
.context
.annotation
.Bean;
-
import
org
.springframework
.context
.annotation
.Configuration;
-
-
@
Configuration
-
public
class
TopicConfig1 {
-
-
@Bean(name=
"topicQueue1")
-
public Queue
topicQueue1() {
-
return
QueueBuilder
.durable(
"topic_queue_q1")
.build();
-
}
-
@
Bean(name=
"topicQueue2")
-
public
Queue
topicQueue2() {
-
return
QueueBuilder
.durable(
"topic_queue_q2")
.build();
-
}
-
-
@
Bean
-
public
TopicExchange
topicExchange(){
-
return
new
TopicExchange(
"topic-exchange");
-
}
-
@
Bean
-
public
Binding
topicBinding1(
@Qualifier(
"topicQueue1") Queue queue,
-
@Qualifier(
"topicExchange") TopicExchange exchange){
-
return
BindingBuilder
.bind(queue)
.to(exchange)
.with(
"person.yy");
-
}
-
-
@
Bean
-
public
Binding
topicBinding2(
@Qualifier(
"topicQueue2") Queue queue,
-
@Qualifier(
"topicExchange") TopicExchange exchange){
-
return
BindingBuilder
.bind(queue)
.to(exchange)
.with(
"person.*");
-
}
-
-
}
测试发送消息
-
package com.rabbitmq.provider.rabbitmqprovider.web;
-
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.web.bind.annotation.RequestMapping;
-
import org.springframework.web.bind.annotation.RestController;
-
-
import java.time.LocalDateTime;
-
import java.time.format.DateTimeFormatter;
-
import java.util.HashMap;
-
import java.util.Map;
-
-
@RestController
-
public
class
SenderController {
-
-
@RequestMapping("/sendTopic")
-
public String
sendTopic
(String routing){
-
Map msg=
new
HashMap<>();
-
msg.put(
"code",
200);
-
msg.put(
"msg",
"this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
-
.ofPattern(
"yyyy-MM-dd HH:mm:ss")));
-
rabbitTemplate.convertAndSend(
"topic-exchange",routing,msg);
-
return
"direct success";
-
}
-
-
}
http://localhost:8081/sendTopic?routing=person.y 只有条件为 y 或者 *
http://localhost:8081/sendTopic?routing=person.yy
消费消息
-
package com.
zking.
rabbitmq.
consumer.
rabbitmqconsumer.
component;
-
-
import org.
springframework.
amqp.
rabbit.
annotation.
RabbitHandler;
-
import org.
springframework.
amqp.
rabbit.
annotation.
RabbitListener;
-
import org.
springframework.
stereotype.
Component;
-
-
import java.
util.
Map;
-
-
@Component
-
-
public
class
TopicReciewer {
-
@RabbitListener(queues={
"topic_queue_q1"})
-
@RabbitHandler
-
public
void
handler(
Map map){
-
System.
out.
println(map);
-
-
}
-
-
@RabbitListener(queues={
"topic_queue_q2"})
-
@RabbitHandler
-
public
void
handler1(
Map map){
-
System.
out.
println(map);
-
-
}
-
-
}
5.3 广播交换机 (扇形)
1)配置广播交换机,队列,并将主题交换机和该队列绑定。
-
package
com
.rabbitmq
.provider
.rabbitmqprovider
.config;
-
-
import
org
.springframework
.amqp
.core.*;
-
import
org
.springframework
.beans
.factory
.annotation
.Qualifier;
-
import
org
.springframework
.context
.annotation
.Bean;
-
import
org
.springframework
.context
.annotation
.Configuration;
-
-
@
Configuration
-
public
class
FanoutConfig {
-
-
@Bean
-
public Queue
fanoutQueue1() {
-
return
new
Queue(
"fanout-queue1");
-
}
-
-
@
Bean
-
public
Queue
fanoutQueue2() {
-
return
new
Queue(
"fanout-queue2");
-
}
-
-
@
Bean
-
public
Queue
fanoutQueue3() {
-
return
new
Queue(
"fanout-queue3");
-
}
-
-
@
Bean
-
public
FanoutExchange
fanoutExchange() {
-
return
new
FanoutExchange(
"fanout-exchange");
-
}
-
-
@
Bean
-
public
Binding
fanoutBInding1(
@Qualifier(
"fanoutQueue1") Queue queue,
-
@Qualifier(
"fanoutExchange") FanoutExchange exchange) {
-
return
BindingBuilder
.bind(queue)
.to(exchange);
-
}
-
-
@
Bean
-
public
Binding
fanoutBInding2(
@Qualifier(
"fanoutQueue2") Queue queue,
-
@Qualifier(
"fanoutExchange") FanoutExchange exchange) {
-
return
BindingBuilder
.bind(queue)
.to(exchange);
-
}
-
-
@
Bean
-
public
Binding
fanoutBInding3(
@Qualifier(
"fanoutQueue3") Queue queue,
-
@Qualifier(
"fanoutExchange") FanoutExchange exchange) {
-
return
BindingBuilder
.bind(queue)
.to(exchange);
-
}
-
}
生产的队列
向服务器发送消息
-
package com.rabbitmq.provider.rabbitmqprovider.web;
-
-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.web.bind.annotation.RequestMapping;
-
import org.springframework.web.bind.annotation.RestController;
-
-
import java.time.LocalDateTime;
-
import java.time.format.DateTimeFormatter;
-
import java.util.HashMap;
-
import java.util.Map;
-
-
@RestController
-
public
class
SenderController {
-
-
@Autowired
-
private RabbitTemplate rabbitTemplate;
-
-
@RequestMapping("/sendFanout")
-
public String
sendFanout
(){
-
Map msg=
new
HashMap<>();
-
msg.put(
"code",
200);
-
msg.put(
"msg",
"this message send at time :"+ LocalDateTime.now().format(DateTimeFormatter
-
.ofPattern(
"yyyy-MM-dd HH:mm:ss")));
-
rabbitTemplate.convertAndSend(
"fanout-exchange",
null,msg);
-
return
"direct success";
-
}
-
}
发送与消费
消费消息
-
package com.
zking.
rabbitmq.
consumer.
rabbitmqconsumer.
component;
-
-
import org.
springframework.
amqp.
rabbit.
annotation.
RabbitHandler;
-
import org.
springframework.
amqp.
rabbit.
annotation.
RabbitListener;
-
import org.
springframework.
stereotype.
Component;
-
-
import java.
util.
Map;
-
-
@Component
-
public
class
FanoutRecevier {
-
-
@RabbitListener(queues={
"fanout-queue1"})
-
@RabbitHandler
-
public
void
fanout(
Map map){
-
System.
out.
println(map);
-
-
}
-
-
@RabbitListener(queues={
"fanout-queue2"})
-
@RabbitHandler
-
public
void
fanout1(
Map map){
-
System.
out.
println(map);
-
-
}
-
@RabbitListener(queues={
"fanout-queue3"})
-
@RabbitHandler
-
public
void
fanout2(
Map map){
-
System.
out.
println(map);
-
-
}
-
}
转载:https://blog.csdn.net/qq_62898618/article/details/128474747