飞道的博客

在鸿蒙系统上使用MQTT编程

327人阅读  评论(0)

我们使用的是paho mqtt软件包,这里介绍一下怎么使用mqtt协议编程。关于鸿蒙系统的mqtt移植好的软件包,相关github链接如下:

https://gitee.com/qidiyun/harmony_mqtt 

这里提供一个简单的编程示例:

这里我们使用MQTTClient编程模型,他支持多任务多线程,非常适合用在鸿蒙系统上。

 

1. 网络初始化

这里定义一个 Network 结构体,然后指定我们的MQTT服务器的IP和端口号。


  
  1. Network n;
  2. //初始化结构体
  3. NetworkInit(&n);
  4. //连接到指定的MQTT服务器IP、端口号
  5. NetworkConnect(&n, “XXX.XXX.XXX.XXX”, XXXX);

 

2. 设置MQTT缓存和启动MQTT线程

我们这里使用的是MQTT线程功能。


  
  1. MQTTClientInit( &c, &n, 1000, buf, 100, readbuf, 100) ;
  2. MQTTStartTask( &c) ;

 

3. 设置MQTT相关参数

接下来我们设置MQTT的相关参数,包括版本号、客户端ID、账户密码等


  
  1. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  2. data.willFlag = 0;
  3. // MQTT版本为 v3
  4. data.MQTTVersion = 3;
  5. //设置客户端 ID
  6. data.clientID.cstring = opts.clientid;
  7. //设置客户端账户
  8. data.username.cstring = opts.username;
  9. //设置客户端密码
  10. data.password.cstring = opts.password;
  11. data.keepAliveInterval = 10;
  12. data.cleansession = 1;
  13. //连接到 MQTT服务器
  14. rc = MQTTConnect(&c, & data);

 

4. 订阅主题和接收消息

订阅主题可以使用如下函数

MQTTSubscribe(&c, topic, opts.qos, messageArrived);

它的函数原型如下:

DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);

其中:

MQTTClient* c :我们前面定义的MQTTClient结构体

const char* topicFilter:订阅的主题

messageHandler messageHandler :接收到主题信息后的回调处理函数。

例如上面我们的回调函数是  messageArrived ,它的原型如下:


  
  1. void messageArrived(MessageData* md)
  2. {
  3. MQTTMessage* message = md->message;
  4. //打印接收到的消息的长度、和消息内容
  5. printf( "%.*s", ( int)message->payloadlen, ( char*)message->payload);
  6. }

 

5. 发送消息

发送消息也比较简单,我们只需要设置好我们的主题和消息内容即可


  
  1. memset(&pubmsg, '\0', sizeof(pubmsg));
  2. //消息内容为 hello harmonyOS !
  3. pubmsg.payload = ( void*) "hello harmonyOS !";
  4. //消息长度
  5. pubmsg.payloadlen = strlen(( char*)pubmsg.payload);
  6. pubmsg.qos = QOS0;
  7. pubmsg.retained = 0;
  8. pubmsg.dup = 0;
  9. //推送消息,主题为 pubtest
  10. MQTTPublish(&c, "pubtest", &pubmsg);

 

完整源码如下:


  
  1. #include <stdio.h>
  2. #include <unistd.h>
  3. #include "ohos_init.h"
  4. #include "cmsis_os2.h"
  5. #include <unistd.h>
  6. #include "hi_wifi_api.h"
  7. //#include "wifi_sta.h"
  8. #include "lwip/ip_addr.h"
  9. #include "lwip/netifapi.h"
  10. #include "lwip/sockets.h"
  11. #include "MQTTClient.h"
  12. /**
  13. * MQTT URI farmat:
  14. * domain mode
  15. * tcp://iot.eclipse.org:1883
  16. *
  17. * ipv4 mode
  18. * tcp://192.168.10.1:1883
  19. * ssl://192.168.10.1:1884
  20. *
  21. * ipv6 mode
  22. * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
  23. * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
  24. */
  25. #define MQTT_URI "tcp://106.13.62.194:1883"
  26. struct opts_struct
  27. {
  28. char* clientid;
  29. int nodelimiter;
  30. char* delimiter;
  31. enum QoS qos;
  32. char* username;
  33. char* password;
  34. char* host;
  35. int port;
  36. int showtopics;
  37. } opts =
  38. {
  39. ( char*) "stdout-subscriber", 0, ( char*) "\n", QOS2, NULL, NULL, ( char*) "106.13.62.194", 1883, 1
  40. };
  41. void messageArrived(MessageData* md)
  42. {
  43. MQTTMessage* message = md->message;
  44. if (opts.showtopics)
  45. printf( "%.*s\t", md->topicName->lenstring.len, md->topicName->lenstring.data);
  46. if (opts.nodelimiter)
  47. printf( "%.*s", ( int)message->payloadlen, ( char*)message->payload);
  48. else
  49. printf( "%.*s%s", ( int)message->payloadlen, ( char*)message->payload, opts.delimiter);
  50. //fflush(stdout);
  51. }
  52. unsigned char buf[ 100];
  53. unsigned char readbuf[ 100];
  54. int mqtt_test(void)
  55. {
  56. int rc = 0;
  57. MQTTMessage pubmsg;
  58. char* topic = "test";
  59. if ( strchr(topic, '#') || strchr(topic, '+'))
  60. opts.showtopics = 1;
  61. if (opts.showtopics)
  62. printf( "topic is %s\n", topic);
  63. Network n;
  64. MQTTClient c;
  65. NetworkInit(&n);
  66. NetworkConnect(&n, opts.host, opts.port);
  67. MQTTClientInit(&c, &n, 1000, buf, 100, readbuf, 100);
  68. MQTTStartTask(&c);
  69. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  70. data.willFlag = 0;
  71. data.MQTTVersion = 3;
  72. data.clientID.cstring = opts.clientid;
  73. data.username.cstring = opts.username;
  74. data.password.cstring = opts.password;
  75. data.keepAliveInterval = 10;
  76. data.cleansession = 1;
  77. printf( "Connecting to %s %d\n", opts.host, opts.port);
  78. rc = MQTTConnect(&c, &data);
  79. printf( "Connected %d\n", rc);
  80. printf( "Subscribing to %s\n", topic);
  81. rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);
  82. printf( "Subscribed %d\n", rc);
  83. memset(&pubmsg, '\0', sizeof(pubmsg));
  84. pubmsg.payload = ( void*) "hello harmonyOS !";
  85. pubmsg.payloadlen = strlen(( char*)pubmsg.payload);
  86. pubmsg.qos = QOS0;
  87. pubmsg.retained = 0;
  88. pubmsg.dup = 0;
  89. while ( 1)
  90. {
  91. MQTTPublish(&c, "pubtest", &pubmsg);
  92. sleep( 1);
  93. }
  94. printf( "Stopping\n");
  95. MQTTDisconnect(&c);
  96. NetworkDisconnect(&n);
  97. return 0;
  98. }

转载:https://blog.csdn.net/aa120515692/article/details/110087293
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场