0x00 为什么使用MQTT
消息队列遥测传输MQTT(Message Queuing Telemetry Transport)是从消息队列MQ(MessageQueue,传统的Publish/Subscribe订阅模型)演变而来,但具有面向物联网应用的特点设计。
从设计目的上讲,MQTT面向物联网,充分理解这种由于成本带来的糟糕带宽/低下性能。断线重连是基操,保证消息的投递/重试是标准,客户端提前立遗嘱以便于掉线处理后事是专属设计。
从使用特点上讲,MQTT与微服务密不可分。MQTT服务器端(代理/Broker)运行之后成为一个单独的服务接入点,需要为之分配处理程序资源,MQTT接入的流量可解码打包直接丢给Kafka,在解耦了各应用各语言的同时还能保持良好的扩展性。
如果你有以上两个痛点需求,那么你需要的正是MQTT,而不是Netty或其他纯TCP框架。
0x01 MQTT概念与构成元素
https://blog.csdn.net/qq_28877125/article/details/78325003
从接入上看,MQTT由服务端、发布端和订阅端组成,符合传统的MQ结构。
0x02 MQTT使用
MQTT本身是一种通信协议,如果想要实际使用,需要选择实现的载体(即服务端/代理端/Broker),市面上有许多实现的实例,各有优缺点。
https://www.jianshu.com/p/cf91f4bea071
博主自测了两款实现:Mosquitto和EMQx。测试使用如下代码运行,分别扮演客户端和发布端。
-
# sub.py 订阅端
-
import paho.mqtt.client
as mqtt
-
-
def on_connect(client, userdata, flags, rc):
-
print(
"Connected with result code: " + str(rc))
-
-
def on_message(client, userdata, msg):
-
print(msg.topic +
" " + str(msg.payload))
-
-
def on_disconnect(client, userdata, rc):
-
print(
"Disconnect with result code: " + str(rc))
-
-
# client = mqtt.Client(client_id, transport='tcp')
-
client = mqtt.Client()
-
client.username_pw_set(username=
'admin', password=
'123456')
-
client.on_connect = on_connect
-
client.on_message = on_message
-
client.on_disconnect = on_disconnect
-
client.connect(host=
'127.0.0.1', port=
1883, keepalive=
600)
-
client.subscribe(
'fifa', qos=
1)
-
client.loop_forever()
-
# pub.py 发布端
-
import paho.mqtt.client
as mqtt
-
-
def on_connect(client, userdata, flags, rc):
-
print(
"Connected with result code: " + str(rc))
-
-
def on_message(client, userdata, msg):
-
print(msg.topic +
" " + str(msg.payload))
-
-
client = mqtt.Client()
-
client.username_pw_set(username=
'admin', password=
'123456')
-
client.on_connect = on_connect
-
client.on_message = on_message
-
client.connect(host=
'127.0.0.1', port=
1883, keepalive=
600)
-
client.publish(
'fifa', payload=
'amazing', qos=
1)
Mosquitto是由C/C++实现的一种极简MQTT实现,主要面向嵌入式。
以上是mosquitto-1.6.9-install-windows-x64全貌,从文件结构上也能体会到mosquitto的简洁性。
-
# 启动mosquitto, -v选项能够看到Broker详细输出
-
mosquitto -c mosquitto.conf -v
实际测试发现mosquitto目前仍然处于开发阶段,qos收发策略测试都不能完全进行。原因在于配置文件817行cleansession false注释一旦取消再次启动就报错。如果项目很小,也比较简单可以考虑mosquitto,流量大需要做集群负载均衡等就不推荐了,本身是不支持的。
-
# Set the clean session variable for this bridge.
-
# When set to true, when the bridge disconnects for any reason, all
-
# messages and subscriptions will be cleaned up on the remote
-
# broker. Note that with cleansession set to true, there may be a
-
# significant amount of retained messages sent when the bridge
-
# reconnects after losing its connection.
-
# When set to false, the subscriptions and messages are kept on the
-
# remote broker, and delivered when the bridge reconnects.
-
#cleansession false
EMQx Broker是由EMQ公司推出的开源MQTT Broker,支持分布式集群,限流限速,共享订阅等诸多便捷功能。
在企业级应用中经常需要均衡承载各种流量/转发等,EMQx都有推出对应的连接插件,可以说胜任各种场景均没问题。
https://docs.emqx.io/broker/latest/cn/
-
D:\Temp\emqx-windows-v4.0.6\emqx\bin\emqx.cmd
-
usage: emqx (
install|
uninstall|
start|
stop|restart|console|ping|
list|attach)
-
-
# 启动EMQx,由于使用ErLang开发,此处可能需要提前安装ErLang
-
emqx
start
-
-
# 访问Dashboard网页控制面板
-
http://
127.0
.0
.1:
18083/
-
admin
-
public
-
再次用之前的python发布端/订阅端测试,基本表现与mosquitto一致,但无疑支持的内容更多。
0x03 结语
至此已建立MQTT基本使用模型,往后的插件协作与经验分享有机会再补充。
Mosquitto和EMQx的配置可参照其他文章。
转载:https://blog.csdn.net/DXCyber409/article/details/105727506