小言_互联网的博客

rabbitMQ linux安装部署+spring融入+SSM调用+单元测试收发

489人阅读  评论(0)

rabbitMQ linux安装部署+spring融入+SSM调用+单元测试收发

更新日期:2020-3-18 chen

一、软件环境

  1. centos6
  2. erlang-solutions-1.0-1.noarch.rpm
  3. rabbitmq-server-3.6.1-1.noarch.rpm

二、安装过程

  1. 安装运行环境erlang

    rpm -ivh erlang-solutions-1.0-1.noarch.rpm
    
  2. 安装rabbitmq-server

    rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm 
    
  3. 修改配置文件注释

    cd /usr/share/doc/rabbitmq-server-3.6.1/
    cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    cd /etc/rabbitmq/
    
    vi rabbitmq.config 
    
    #修改64行
    :set number
     64    %% {loopback_users, []},
     #修改为 {loopback_users, []}
    
  4. 配置控制台

    rabbitmq-plugins enable rabbitmq_management
    

    The following plugins have been enabled:
    mochiweb
    webmachine
    rabbitmq_web_dispatch
    amqp_client
    rabbitmq_management_agent
    rabbitmq_management

    Applying plugin configuration to rabbit@bogon… failed.

    • Could not contact node rabbit@bogon.
      Changes will take effect at broker restart.

    • Options: --online - fail if broker cannot be contacted.
      –offline - do not try to contact broker.

  5. 启动

    service rabbitmq-server start
    

    Starting rabbitmq-server: SUCCESS

    其他命令:

    service rabbitmq-server   start
    service rabbitmq-server   stop
    service rabbitmq-server   status
    service rabbitmq-server   rotate-logs|
    service rabbitmq-server   restart
    service rabbitmq-server   condrestart
    service rabbitmq-server   try-restart
    service rabbitmq-server   reload
    service rabbitmq-server   force-reload
    ps -ef | grep rabbitmq      #查看rabbitMq进程
    netstat -anplt | grep LISTEN  rabbitmq      #默认监听端口15672/5672
    

    报错解决:

    Starting rabbitmq-server: FAILED - check /var/log/rabbitmq/startup_{log, _err}
    rabbitmq-server.
    
    #解决:rabbitmq默认用户是bogon,修改hosts文件,
    vi /etc/hosts	
    #加入一行	
    127.0.0.1 bogon
    
  6. 打开端口

    /sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
    /sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
    /etc/rc.d/init.d/iptables save
    /etc/init.d/iptables status
    

    Chain INPUT (policy ACCEPT)
    num target prot opt source destination
    1 ACCEPT tcp – 0.0.0.0/0 0.0.0.0/0 tcp dpt:5672
    2 ACCEPT tcp – 0.0.0.0/0 0.0.0.0/0 tcp dpt:15672

  7. 设置开机启动

    chkconfig rabbitmq-server on
    

    You have new mail in /var/spool/mail/root

  8. 安装完成

三、web控制台设置

  1. http://192.168.8.91:15672

  2. 默认账号密码为guest guest,

  3. 添加管理员用户 chenadmin chen

  4. 添加虚拟主机 /chen

  5. 给虚拟主机添加管理用户权限

  6. 添加权限成功

  7. 使用新管理员chenadmin登录

  8. web控制台配置完成

四、Java SSM Spring框架整合,选用了路由模式

  1. maven依赖

    <!-- 消息队列 -->
    <dependency>
    	<groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>3.5.1</version>
    </dependency>
    <dependency>
           <groupId>org.springframework.amqp</groupId>
           <artifactId>spring-rabbit</artifactId>
           <version>1.4.0.RELEASE</version>
    </dependency>
    
  2. rabbitmq.properties

    rabbit.ip=192.168.8.91
    rabbit.port=5672
    rabbit.username=chenadmin
    rabbit.password=chen
    rabbit.vhost=/chen
    
  3. 发送端配置

    1) 整合配置文件applicationContext-rabbitmq-send.xml

    <<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    	xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
    	http://www.springframework.org/schema/beans
    	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
    
    	<!-- 定义RabbitMQ的连接工厂 -->
    	<rabbit:connection-factory id="connectionFactory"
    		host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}"
    		password="${rabbit.password}" virtual-host="${rabbit.vhost}" />
    
    	<!-- 定义Rabbit模板,指定连接工厂以及定义exchange,可修改exchange名称 -->
    	<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
    		exchange="chen-itemDirectExchange" />
    
    	<!-- MQ的管理,包括队列、交换器等 -->
    	<rabbit:admin connection-factory="connectionFactory" />
    
    	<!-- 定义交换器,自动声明,durable持久化 -->
    	<rabbit:direct-exchange name="chen-itemDirectExchange"
    		auto-declare="true" durable="true">
    	</rabbit:direct-exchange>
    
    </beans>
    
    

    2) spring隐含注入一个类RabbitTemplate, 直接在servcie中注入rabbitTemplate发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;
    		
    String routingKey = "item.update";
    	//rabbitMQ send 
    rabbitTemplate.convertAndSend(routingKey, item.getId());
    
  4. 接收端配置:接收端只是利用spring容器来启动它的侦听,本质就是一个单独的线程。

  1. applicationContext-rabbitmq-receive.xml,配置侦听(事件)。配置响应类,指定一个方法,消息就作为方法的参数。
 <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

	<!-- 定义RabbitMQ的连接工厂 -->
	<rabbit:connection-factory id="connectionFactory"
		host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
		virtual-host="${rabbit.vhost}" />

	<!-- MQ的管理,包括队列、交换器等 -->
	<rabbit:admin connection-factory="connectionFactory" />
	
	<!-- 定义消息队列:名称可修改-->
	<rabbit:queue name="chen-web.itemQueue" auto-declare="true"/>
	
	<!-- 定义交换机,并且完成队列和交换机的绑定,名称可修改 -->
	<rabbit:direct-exchange name="chen-itemDirectExchange" auto-declare="true">
		<rabbit:bindings>
			<!-- 前台系统只接收商品更新的消息,key路由key -->
			<rabbit:binding queue="chen-web.itemQueue" key="item.update"/>
		</rabbit:bindings>
	</rabbit:direct-exchange>
	
	<!-- 定义监听 -->
	<rabbit:listener-container connection-factory="connectionFactory">
		<!-- 监听一个队列,当队列中有消息,就会自动触发类.方法,传递消息就作为方法的参数,根据方法声明的参数强转 -->
		<rabbit:listener ref="rabbitItemService" method="updateItem" queue-names="chen-web.itemQueue"/>
	</rabbit:listener-container>

</beans>
  1. 配置监听中使用的rabbitItemService类和updateItem方法
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import redis.clients.jedis.JedisCluster;

@Service
public class RabbitItemService {
	@Autowired
	private JedisCluster redisService;
	public void updateItem(Long itemId){
		//delete redis data
		redisService.del("ITEM_"+itemId);
	}
}
  1. rabbitMQ 融入spring完成。

五、单元测试RabbitMQ

  1. java 程序测试发送

    package rabbitmq;
    import java.io.IOException;
    import org.junit.Test;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    //消息发送
    public class SendTest {
    	/**
    	 * 步骤: 1. 创建链接,链接到MQ虚拟机,以用户名和密码登录 2. 创建通道Channel 3. 声明或者绑定队列 4. 发送者发送消息到消息队列
    	 */
    	@Test // 发送
    	public void send() throws IOException {
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.8.91");
    		factory.setPort(5672);
    
    		factory.setVirtualHost("/chen");
    		factory.setUsername("chenadmin");
    		factory.setPassword("chen");
    
    		Connection cn = factory.newConnection();
    		Channel chan = cn.createChannel();
    
    		// 声明队列
    		chan.queueDeclare("chen_simple", false, false, false, null);
    
    		// 发送消息
    		String msg = null;
    		for (int i = 0; i < 10000; i++) {
    			msg = "hello" + i;
    			chan.basicPublish("", "chen_simple", null, msg.getBytes());
    		}
    	}
    }
    
    
  2. java 程序测试接收

    package rabbitmq;
    import java.io.IOException;
    import org.junit.Test;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class RabbitMQTestRecv {
    	@Test // 接收消息
    	public void recv() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost("192.168.8.91");
    		factory.setPort(5672);
    		factory.setVirtualHost("/chen");
    		factory.setUsername("chenadmin");
    		factory.setPassword("chen");
    
    		Connection cn = factory.newConnection();
    		Channel chan = cn.createChannel();
    		chan.queueDeclare("chen_simple", false, false, false, null);
    
    		// 创建消费者
    		QueueingConsumer consumer = new QueueingConsumer(chan);
    		// 绑定消费者到消息队列,消费消息,true自动确认(一旦获取,消息就被从队列中移除)
    		chan.basicConsume("chen_simple", true, consumer);
    
    		while (true) { // 利用死循环来模拟侦听
    			// 从消费者对象中获取下一条消息
    			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    			String msg = new String(delivery.getBody());
    			System.out.println(msg);
    		}
    	}
    
    }
    
    

    the end


转载:https://blog.csdn.net/qq_39339588/article/details/104949822
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场