小言_互联网的博客

RabbitMQ

295人阅读  评论(0)

1.MQ基础
2.常见消息模型
3.消息的可靠性
4.消息持久化
5.消费者确认机制ACK
6.死信交换机
7.延时交换机
8.消息堆积
9.MQ集群

基础

同步通讯:

  • 优点
    • 时效性更强,可以立刻获得结果
  • 问题
    • 耦合度高
    • 性能吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

异步通信

  • 优点
    • 耦合度低
    • 吞吐量提升
    • 故障隔离
    • 流量削峰
  • 缺点
    • 依赖于Broker的可靠性、安全性、吞吐能力
    • 架构复杂,没有明显流程线,不好追踪问题

安装

  • systemctl start docker # 启动docker服务

  • docker load -i mq.tar # 上传mq,并且安装

  • 运行MQ
    docker run
    -e RABBITMQ_DEFAULT_USER=xiaowang
    -e RABBITMQ_DEFAULT_PASS=123321
    -v mq-plugins:/plugins
    –name mq
    –hostname mq
    -p 15672:15672
    -p 5672:5672
    -d
    rabbitmq:3.8-management

  • 通过ip地址 + 端口访问 http://ip:15672/

简介

消息发送者将消息发送交换机,交换机将消息路由交给队列,队列存储消息,消费者从队列获取消息,处理消息

常见消息模型

基本消息队列

发送者将信息发出,接收者创建队列查看是否存在当前队列,定义回调函数,将函数与队列做绑定,一般队列有消息,就会调用回调函数

发送者

public void testSendMessage() throws IOException, TimeoutException {
   
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }

 

接收者

  public static void main(String[] args) throws IOException, TimeoutException {
   
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
   
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
   
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }

 

SpringAMQP

引入依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置AMQP地址

spring:
  rabbitmq:
    host:  # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: xiaowang # 用户名
    password: 123321 # 密码

发送者使用RabbitTemplate 发送队列消息
只能发送在已存在的队列

public class SpringAmpqpTest {
   

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue(){
   
        String queueName = "simple.queue";
        String message = "Hello,Spring amqp !!!!!";
        rabbitTemplate.convertAndSend(queueName,message);
    }

}

监听队列的消息
一旦接收到队列,该队列就会被删除

    @RabbitListener(queues = "simple.queue")
    public void ListenterSimpleQuery(String message){
   
        System.out.println("消费者接收到发送者的消息:"+message);
    }

工作队列 WorkQueue

两个消费者同时处理请求,

spring:
  rabbitmq:
    host: 192.168.13.133 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: xiaowang # 用户名
    password: 123321 # 密码
    listener:
      simple:
        prefetch: 1  # 每次只能获取到一条消息,处理完才能获取第二条消息

交换机 ->发布/订阅模型

发布者发布消息到队列中,消费者获取到这条队列,队列种的数据就会删除.如果不想让队列删除,增加交换机,绑定队列,发布者发送至交换机,由交换机发布到每一个队列中

  • 交换机绑定队列
@Configuration
public class FanoutConfig {
   

    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
   
        return new FanoutExchange("itcast.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
   
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
   
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
   
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
   
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }


}

 
  • 消费者
    @RabbitListener(queues = "fanout.queue1")
    public void ListenterSimpleQuery1(String message) throws InterruptedException {
   
        Thread.sleep(20);
        System.out.println("消费者接收到发送者的消息1:"+message+"------"+LocalTime.now());
    }

    @RabbitListener(queues = "fanout.queue2")
    public void ListenterSimpleQuerytwo2(String message) throws InterruptedException {
   
        Thread.sleep(200);
        System.out.println("消费者接收到发送者的消息2:"+message+"-------"+LocalTime.now());
    }
  • 发布者
    // 发送到交换机
    @Test
    public void testSend(){
   
        String switchName = "itcast.fanout" ;
        String message = "hello.every one !!!!" ;
        rabbitTemplate.convertAndSend(switchName , "",message);
    }

路由模式 Direct交换机

发布者指定交换机,以及发送的key,对应的消费者key,就可以接收到
消费者

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1") ,   //队列
            exchange = @Exchange(name = "itcast.direct"),
            key = {
   "red","blue"}
    ))
    public void ListeDirectQueue1(String msg){
   
        System.out.println("消费者1发送的消息是"+msg);
    }
    
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2") ,   //队列
            exchange = @Exchange(name = "itcast.direct"),//交换机
            key = {
   "red","yellow"} //指定key 访问哪个key就可以发送到哪一个队列
    ))
    public void ListeDirectQueue2(String msg){
   
        System.out.println("消费者2发送的消息是"+msg);
    }

 

发布者

    // 发送到交换机
    @Test
    public void testSend2(){
   
        String switchName = "itcast.direct" ;
        String message = "hello.every red !!!!" ;
        rabbitTemplate.convertAndSend(switchName , "red",message);
    }

Topic交换机 支持通配符

生产者发送的消息,消费者通过通配符的形式查看是否符合

消费者

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1") ,   //队列
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),//交换机
            key = {
   "china.#"} //指定key 访问哪个key就可以发送到哪一个队列
    ))
    public void ListeTopQueue1(String msg){
   
        System.out.println("消费者china1发送的消息是"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2") ,   //队列
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),//交换机
            key = {
   "#.news"} //指定key 访问哪个key就可以发送到哪一个队列
    ))
    public void ListeTopQueue2(String msg){
   
        System.out.println("消费者china2发送的消息是"+msg);
    }

 

生产者

    // 发送到交换机
    @Test
    public void testTop1(){
   
        String switchName = "itcast.topic" ;
        String message = "hello.every China !!!!" ;
        rabbitTemplate.convertAndSend(switchName , "china.news",message);
    }

    @Test
    public void testTop2(){
   
        String switchName = "itcast.topic" ;
        String message = "hello.天气 GOOd" ;
        rabbitTemplate.convertAndSend(switchName , "china.weather",message);
    }

消息转换器

引入jackson依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

声明转化json的bean

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

发送定义转化json

    @Bean
    public MessageConverter messageConverter(){
   
        return new Jackson2JsonMessageConverter();//转json
    }

可以显示中文

    @Test
    public void testTop3(){
   
        Map<String,Object> map =new HashMap<>();
        map.put("你好",123);
        map.put("hello123",123);
        rabbitTemplate.convertAndSend( "object.queue",map);
    }

消费者监听消息转化为中文

@Bean
public MessageConverter jsonMessageConverter(){
   
    return new Jackson2JsonMessageConverter();
}

消费者监听消息

    @RabbitListener(queues = "object.queue")
    public void Test(Map<String,Object> map) {
   
        System.out.println("接收到object消息"+map);
    }

mq消息的可靠性

消息丢失

  • mq生产者将消息发送的时候,可能造成数据的丢失
  • 消息在mq存储的时候,可能会造成数据的丢失
  • 消费者在消费消息时,可能会造成数据的丢失

生产者确认机制,会发送是否发送到交换机的通知

1.生产者设置文件
2.设置交换机/队列/并绑定

3.判断消息是否发送到交换机

消息没有发送都队列中

  • 实现步骤:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration  // 生产者确认机制,确认是否发送到交换机
public class CommonConfig implements ApplicationContextAware {
   


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
   
        // 从IOC容器中,获取template对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 执行发送者的回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
   
            /**
             * @param correlationData  // 自定义的数据 消息UUID
             * @param ack // 确认是否发送到交换机中  true是发送到交换机中
             * @param cause // 原因 , 没有发送到交换机中的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   

                log.info("发送确认回调触发,当前消息的id{}",correlationData.getId());

                if (ack){
   
                    log.info("消息已成功发入交换机中----------------");
                }else {
   
                    log.error("消息没有发送到交换机中!!!!!!!!!  原因未{}" ,  cause);
                    //  这边进行业务重新发送
                }
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
   
            /**
             *  这个方法触发,代表消息没能正确的路由到队列,被mq返回回来了
             * @param message  返回的消息对象
             * @param replyCode   // 回复的状态编码
             * @param replyTest // 回复的内容
             * @param exchange // 交换机
             * @param routingKey // 路由key
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyTest, String exchange, String routingKey) {
   
                log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                        replyCode,replyTest,exchange,routingKey,message.toString());
                //  这边进行业务重新发送
            }
        });
    }


    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public DirectExchange simpleExchange(){
   
        return new DirectExchange("simple.direct",false ,false);
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue simpleQueue(){
   
        return new Queue("fanout.queue" ,false);
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding binding(){
   
        return BindingBuilder.bind(simpleQueue()).to(simpleExchange()).with("simple");
    }
}


 

消息持久化

  • 交换机与队列是否持久化
  • 发送的消息是否持久化

消费者确认机制

  • 只有消费者确认了,这条消息才会从rabbit MQ中删除
  • 有三种模式

自动ack的重试策略 -> 根据设定的最大次数重置后,如果还没有读取到这条消息,这条消息就会丢失

消费者重复失败策略,重试耗尽后,将消息投递到指定交换机->修复问题: 根据设定的最大次数重置后,如果还没有读取到这条消息,这条消息就会丢失


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CommonConfig {
   

    // 设置交换机并绑定队列
    @Bean
    public DirectExchange errorMessageExchange(){
   
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
   
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
   
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    // 失败策略
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
   
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }

}


 

死信交换机

延时交换机

  • 下载插件

    声明方式 1

    声明方式 2

消息堆积

处理消息堆积的方法

惰性队列存储更大量的数据,普通消息队列存储在内存上,向硬盘缓存同步,惰性队列直接存储在硬盘上

特点

实战

集群


转载:https://blog.csdn.net/qq_48690030/article/details/128033515
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场