rabbitMQ linux安装部署+spring融入+SSM调用+单元测试收发
更新日期:2020-3-18
chen
一、软件环境
- centos6
- erlang-solutions-1.0-1.noarch.rpm
- rabbitmq-server-3.6.1-1.noarch.rpm
二、安装过程
-
安装运行环境erlang
rpm -ivh erlang-solutions-1.0-1.noarch.rpm
-
安装rabbitmq-server
rpm -ivh rabbitmq-server-3.6.1-1.noarch.rpm
-
修改配置文件注释
cd /usr/share/doc/rabbitmq-server-3.6.1/ cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config cd /etc/rabbitmq/ vi rabbitmq.config #修改64行 :set number 64 %% {loopback_users, []}, #修改为 {loopback_users, []}
-
配置控制台
rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_managementApplying plugin configuration to rabbit@bogon… failed.
-
Could not contact node rabbit@bogon.
Changes will take effect at broker restart. -
Options: --online - fail if broker cannot be contacted.
–offline - do not try to contact broker.
-
-
启动
service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
其他命令:
service rabbitmq-server start service rabbitmq-server stop service rabbitmq-server status service rabbitmq-server rotate-logs| service rabbitmq-server restart service rabbitmq-server condrestart service rabbitmq-server try-restart service rabbitmq-server reload service rabbitmq-server force-reload ps -ef | grep rabbitmq #查看rabbitMq进程 netstat -anplt | grep LISTEN rabbitmq #默认监听端口15672/5672
报错解决:
Starting rabbitmq-server: FAILED - check /var/log/rabbitmq/startup_{log, _err} rabbitmq-server. #解决:rabbitmq默认用户是bogon,修改hosts文件, vi /etc/hosts #加入一行 127.0.0.1 bogon
-
打开端口
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT /sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT /etc/rc.d/init.d/iptables save /etc/init.d/iptables status
Chain INPUT (policy ACCEPT)
num target prot opt source destination
1 ACCEPT tcp – 0.0.0.0/0 0.0.0.0/0 tcp dpt:5672
2 ACCEPT tcp – 0.0.0.0/0 0.0.0.0/0 tcp dpt:15672 -
设置开机启动
chkconfig rabbitmq-server on
You have new mail in /var/spool/mail/root
-
安装完成
三、web控制台设置
-
http://192.168.8.91:15672
-
默认账号密码为guest guest,
-
添加管理员用户 chenadmin chen
-
添加虚拟主机 /chen
-
给虚拟主机添加管理用户权限
-
添加权限成功
-
使用新管理员chenadmin登录
-
web控制台配置完成
四、Java SSM Spring框架整合,选用了路由模式
-
maven依赖
<!-- 消息队列 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency>
-
rabbitmq.properties
rabbit.ip=192.168.8.91 rabbit.port=5672 rabbit.username=chenadmin rabbit.password=chen rabbit.vhost=/chen
-
发送端配置
1) 整合配置文件
applicationContext-rabbitmq-send.xml
<<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 定义RabbitMQ的连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.vhost}" /> <!-- 定义Rabbit模板,指定连接工厂以及定义exchange,可修改exchange名称 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="chen-itemDirectExchange" /> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定义交换器,自动声明,durable持久化 --> <rabbit:direct-exchange name="chen-itemDirectExchange" auto-declare="true" durable="true"> </rabbit:direct-exchange> </beans>
2) spring隐含注入一个类RabbitTemplate, 直接在servcie中注入rabbitTemplate发送消息
@Autowired private RabbitTemplate rabbitTemplate; String routingKey = "item.update"; //rabbitMQ send rabbitTemplate.convertAndSend(routingKey, item.getId());
-
接收端配置:接收端只是利用spring容器来启动它的侦听,本质就是一个单独的线程。
- 在
applicationContext-rabbitmq-receive.xml
,配置侦听(事件)。配置响应类,指定一个方法,消息就作为方法的参数。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
virtual-host="${rabbit.vhost}" />
<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 定义消息队列:名称可修改-->
<rabbit:queue name="chen-web.itemQueue" auto-declare="true"/>
<!-- 定义交换机,并且完成队列和交换机的绑定,名称可修改 -->
<rabbit:direct-exchange name="chen-itemDirectExchange" auto-declare="true">
<rabbit:bindings>
<!-- 前台系统只接收商品更新的消息,key路由key -->
<rabbit:binding queue="chen-web.itemQueue" key="item.update"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 定义监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<!-- 监听一个队列,当队列中有消息,就会自动触发类.方法,传递消息就作为方法的参数,根据方法声明的参数强转 -->
<rabbit:listener ref="rabbitItemService" method="updateItem" queue-names="chen-web.itemQueue"/>
</rabbit:listener-container>
</beans>
- 配置监听中使用的
rabbitItemService
类和updateItem
方法
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import redis.clients.jedis.JedisCluster;
@Service
public class RabbitItemService {
@Autowired
private JedisCluster redisService;
public void updateItem(Long itemId){
//delete redis data
redisService.del("ITEM_"+itemId);
}
}
- rabbitMQ 融入spring完成。
五、单元测试RabbitMQ
-
java 程序测试发送
package rabbitmq; import java.io.IOException; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //消息发送 public class SendTest { /** * 步骤: 1. 创建链接,链接到MQ虚拟机,以用户名和密码登录 2. 创建通道Channel 3. 声明或者绑定队列 4. 发送者发送消息到消息队列 */ @Test // 发送 public void send() throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.8.91"); factory.setPort(5672); factory.setVirtualHost("/chen"); factory.setUsername("chenadmin"); factory.setPassword("chen"); Connection cn = factory.newConnection(); Channel chan = cn.createChannel(); // 声明队列 chan.queueDeclare("chen_simple", false, false, false, null); // 发送消息 String msg = null; for (int i = 0; i < 10000; i++) { msg = "hello" + i; chan.basicPublish("", "chen_simple", null, msg.getBytes()); } } }
-
java 程序测试接收
package rabbitmq; import java.io.IOException; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class RabbitMQTestRecv { @Test // 接收消息 public void recv() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.8.91"); factory.setPort(5672); factory.setVirtualHost("/chen"); factory.setUsername("chenadmin"); factory.setPassword("chen"); Connection cn = factory.newConnection(); Channel chan = cn.createChannel(); chan.queueDeclare("chen_simple", false, false, false, null); // 创建消费者 QueueingConsumer consumer = new QueueingConsumer(chan); // 绑定消费者到消息队列,消费消息,true自动确认(一旦获取,消息就被从队列中移除) chan.basicConsume("chen_simple", true, consumer); while (true) { // 利用死循环来模拟侦听 // 从消费者对象中获取下一条消息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(msg); } } }
the end
转载:https://blog.csdn.net/qq_39339588/article/details/104949822