一、创建MQTT平台服务
免费创建网址:https://cloud.emqx.cn/
或者使用开放免费:
broker: broker.emqx.io
port: 1833
ClientID: go_mqtt_client
Username: emqx
Password: public
二、实现方式(支持 mqtt、mqtts、ws、wss)协议
案例1:
package main
import (
"fmt"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
//12847(mqtt), 12173(mqtts), 8083(ws), 8084(wss)
const broker = "tcp://tee2b1be.cn.emqx.cloud:12847"
const username = "xhcomvip"
const password = ""
const ClientID = "go_mqtt_client"
//message的回调
var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("[%s] -> %s\n", msg.Topic(), msg.Payload())
}
var wg sync.WaitGroup
var client mqtt.Client
func main() {
//连接MQTT服务器
mqttConnect()
defer client.Disconnect(250) //注册销毁
wg.Add(1)
go mqttSubScribe("topic/test")
wg.Add(1)
go testPublish()
wg.Wait()
}
func mqttConnect() {
//配置
clinetOptions := mqtt.NewClientOptions().AddBroker(broker).SetUsername(username).SetPassword(password)
clinetOptions.SetClientID(ClientID)
clinetOptions.SetConnectTimeout(time.Duration(60) * time.Second)
//连接
client = mqtt.NewClient(clinetOptions)
//客户端连接判断
if token := client.Connect(); token.WaitTimeout(time.Duration(60)*time.Second) && token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
func mqttSubScribe(topic string) {
defer wg.Done()
for {
token := client.Subscribe(topic, 1, onMessage)
token.Wait()
}
}
//测试 3秒发送一次,然后自己接收
func testPublish() {
defer wg.Done()
for {
client.Publish("topic/test", 1, false, "TEST")
time.Sleep(time.Duration(3) * time.Second)
}
}
案例2:
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
//https://cloud.emqx.cn/console/deployments/0?oper=new
// ClientOptions:用于设置 broker,端口,客户端 id ,用户名密码等选项
// messagePubHandler:全局 MQTT pub 消息处理
// connectHandler:连接的回调
// connectLostHandler:连接丢失的回调
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
client.Disconnect(250)
}
//发布消息
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("topic/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
//订阅
func sub(client mqtt.Client) {
topic := "topic/test"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
//如果想使用 TLS 连接,可以如下设置:
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
// Import client certificate/key pair
clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{
clientKeyPair},
}
}
//如果不设置客户端证书,可以如下设置:
func NewTlsConfigs() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
}
}
案例3:
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type Config struct {
Host string
Port int
Action string
Topic string
Username string
Password string
Qos int
Tls bool
CaCert string
}
var Host = flag.String("host", "broker.emqx.io", "server hostname or IP")
var Port = flag.Int("port", 1883, "server port")
var Action = flag.String("action", "pubsub", "pub/sub/pubsub action")
var Protocol = flag.String("protocol", "mqtt", "mqtt/mqtts/ws/wss")
var Topic = flag.String("topic", "golang-mqtt/test", "publish/subscribe topic")
var Username = flag.String("username", "emqx", "username")
var Password = flag.String("password", "public", "password")
var Qos = flag.Int("qos", 0, "MQTT QOS")
var Tls = flag.Bool("tls", false, "Enable TLS/SSL")
var CaCert = flag.String("cacert", "./broker.emqx.io-ca.crt", "tls cacert")
func main() {
flag.Parse()
config := Config{
Host: *Host, Port: *Port, Action: *Action, Topic: *Topic, Username: *Username, Password: *Password, Qos: *Qos, Tls: *Tls, CaCert: *CaCert}
protocol := *Protocol
switch protocol {
case "mqtt":
MQTTConnection(config)
case "mqtts":
MQTTSConnection(config)
case "ws":
WSConnection(config)
case "wss":
WSSConnection(config)
default:
log.Fatalf("Unsupported protocol: %s", protocol)
}
}
func Pub(client mqtt.Client, topic string) {
pubClient := client
i := 1
for {
payload := fmt.Sprintf("%d", i)
pubClient.Publish(topic, 0, false, payload)
log.Printf("pub [%s] %s\n", topic, payload)
//i += 1
i++
time.Sleep(1 * time.Second)
}
}
func Sub(client mqtt.Client, topic string) {
subClient := client
subClient.Subscribe(topic, 0, func(subClient mqtt.Client, msg mqtt.Message) {
log.Printf("sub [%s] %s\n", msg.Topic(), string(msg.Payload()))
})
for {
time.Sleep(1 * time.Second)
}
}
func PubSub(client mqtt.Client, topic string) {
go Sub(client, topic)
Pub(client, topic)
}
func connectByMQTT(config Config) mqtt.Client {
opts := mqtt.NewClientOptions()
broker := fmt.Sprintf("tcp://%s:%d", config.Host, config.Port)
opts.AddBroker(broker)
opts.SetUsername(config.Username)
opts.SetPassword(config.Password)
client := mqtt.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(3 * time.Second) {
}
if err := token.Error(); err != nil {
log.Fatal(err)
}
return client
}
func connectByMQTTS(config Config) mqtt.Client {
var tlsConfig tls.Config
if config.Tls && config.CaCert == "" {
log.Fatalln("TLS field in config is required")
}
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile(config.CaCert)
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
tlsConfig.RootCAs = certpool
opts := mqtt.NewClientOptions()
broker := fmt.Sprintf("ssl://%s:%d", config.Host, config.Port)
println(broker)
opts.AddBroker(broker)
opts.SetUsername(config.Username)
opts.SetPassword(config.Password)
opts.SetTLSConfig(&tlsConfig)
client := mqtt.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(3 * time.Second) {
}
if err := token.Error(); err != nil {
log.Fatal(err)
}
return client
}
func connectByWS(config Config) mqtt.Client {
opts := mqtt.NewClientOptions()
broker := fmt.Sprintf("ws://%s:%d/mqtt", config.Host, config.Port)
opts.AddBroker(broker)
opts.SetUsername(config.Username)
opts.SetPassword(config.Password)
client := mqtt.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(3 * time.Second) {
}
if err := token.Error(); err != nil {
log.Fatal(err)
}
return client
}
func connectByWSS(config Config) mqtt.Client {
var tlsConfig tls.Config
if config.Tls && config.CaCert == "" {
log.Fatalln("TLS field in config is required")
}
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile(config.CaCert)
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
tlsConfig.RootCAs = certpool
opts := mqtt.NewClientOptions()
broker := fmt.Sprintf("wss://%s:%d/mqtt", config.Host, config.Port)
opts.AddBroker(broker)
opts.SetUsername(config.Username)
opts.SetPassword(config.Password)
opts.SetTLSConfig(&tlsConfig)
client := mqtt.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(3 * time.Second) {
}
if err := token.Error(); err != nil {
log.Fatal(err)
}
return client
}
func MQTTSConnection(config Config) {
client := connectByMQTTS(config)
action := config.Action
switch action {
case "pub":
Pub(client, config.Topic)
case "sub":
Sub(client, config.Topic)
case "pubsub":
PubSub(client, config.Topic)
default:
log.Fatalf("Unsupported action: %s", action)
}
}
func MQTTConnection(config Config) {
client := connectByMQTT(config)
action := config.Action
switch action {
case "pub":
Pub(client, config.Topic)
case "sub":
Sub(client, config.Topic)
case "pubsub":
PubSub(client, config.Topic)
default:
log.Fatalf("Unsupported action: %s", action)
}
}
func WSConnection(config Config) {
client := connectByWS(config)
action := config.Action
switch action {
case "pub":
Pub(client, config.Topic)
case "sub":
Sub(client, config.Topic)
case "pubsub":
PubSub(client, config.Topic)
default:
log.Fatalf("Unsupported action: %s", action)
}
}
func WSSConnection(config Config) {
client := connectByWSS(config)
action := config.Action
switch action {
case "pub":
Pub(client, config.Topic)
case "sub":
Sub(client, config.Topic)
case "pubsub":
PubSub(client, config.Topic)
default:
log.Fatalf("Unsupported action: %s", action)
}
}
参考:https://github.com/emqx/MQTT-Client-Examples
转载:https://blog.csdn.net/qq_32447301/article/details/114277530
查看评论