标签 mqtt 下的文章

“”

mosquitto 使用-q来设置qos参数

-q : quality of service level to use for the subscription. Defaults to 0.
--will-qos : QoS level for the client Will.

注意事项:
If a message is published with QoS 2 and the client is subscribed to a topic with QoS 0, then the message will be delivered to that client with QoS 0;

if a subscriber is registered with QoS 2 and the message is published with QoS 0, the message will be delivered with QoS 0.

MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,提供了三种不同层次QoS(Quality of Service):

QoS0,At most once,至多一次;
QoS1,At least once,至少一次;
QoS2,Exactly once,确保只有一次。
QoS 是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议:

QoS0 代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
QoS1 代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
QoS2 代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
注意:
QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscibe的时候和Broker协商的QoS等级。

MQTT QoS 等级的选择
QoS 级别越高,流程越复杂,系统资源消耗越大。应用程序可以根据自己的网络场景和业务需求,选择合适的 QoS 级别。

以下情况下可以选择 QoS 0
可以接受消息偶尔丢失。
在同一个子网内部的服务间的消息交互,或其他客户端与服务端网络非常稳定的场景。
以下情况下可以选择 QoS 1
对系统资源消耗较为关注,希望性能最优化。
消息不能丢失,但能接受并处理重复的消息。
以下情况下可以选择 QoS 2
不能忍受消息丢失(消息的丢失会造成生命或财产的损失),且不希望收到重复的消息。
数据完整性与及时性要求较高的银行、消防、航空等行业。

服务器端要求双向验证

mosquitto -c etc/mosquitto.conf

使用sub订阅时出现错误

mosquitto_sub -L "mqtts://localhost:8883/const.net.cn" --cafile ca.crt -d

Client (null) sending CONNECT
Error: host name verification failed.
OpenSSL Error[0]: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed
Error: A TLS error occurred.

在服务器端看到标题所示提示信息
1626420060: New connection from 127.0.0.1:37766 on port 8883.
1626420060: OpenSSL Error[0]: error:14094438:SSL routines:ssl3_read_bytes:tlsv1 alert internal error
1626420060: Client <unknown> disconnected: Protocol error.
1626420068: New connection from 127.0.0.1:37768 on port 8883.
1626420068: OpenSSL Error[0]: error:14094438:SSL routines:ssl3_read_bytes:tlsv1 alert internal error
1626420068: Client <unknown> disconnected: Protocol error.

修改sub订阅中,加上--insecure,错误信息就变了

mosquitto_sub -L "mqtts://localhost:8883/const.net.cn" --cafile ca.crt -d --insecure

Client (null) sending CONNECT
OpenSSL Error[0]: error:1409445C:SSL routines:ssl3_read_bytes:tlsv13 alert certificate required
Error: The connection was lost.
服务器端日志
1626420113: New connection from 127.0.0.1:37770 on port 8883.
1626420113: OpenSSL Error[0]: error:1417C0C7:SSL routines:tls_process_client_certificate:peer did not return a certificate
1626420113: Client <unknown> disconnected: Protocol error.

这个是提示没有提供客户端的证书,因为服务器采取的双向认证嘛。

加上客户端证书与密钥的命令

mosquitto_sub -L "mqtts://localhost:8883/const.net.cn" --cafile ca.crt --key client.key --cert client.crt -d --insecure

Client (null) sending CONNECT
Client (null) received CONNACK (0)
Client (null) sending SUBSCRIBE (Mid: 1, Topic: const.net.cn, QoS: 0, Options: 0x00)
Client (null) received SUBACK
Subscribed (mid: 1): 0
完美解决问题。

https://snowyang.com/2020/12/11/Network/MQTT/mosquitto/也有提到这个问题。作者还推荐了一个单片机的实现mqtts的方案(mbedtls+lwip+mqtt)
https://mcuoneclipse.com/2017/04/23/tuturial-mbedtls-sll-certificate-verification-with-mosquitto-lwip-and-mqtt/

最后给上参数说明

--insecure : do not check that the server certificate hostname matches the remote
              hostname. Using this option means that you cannot be sure that the
              remote host is the server you wish to connect to and so is insecure.
              Do not use this option in a production environment.

下面是一个Will Message的示例:

Sub端clientid=sub预定义遗嘱消息:

mosquitto_sub --will-topic test --will-payload offline --will-qos 2 -t topic -i sub -h 192.168.1.1

客户端 clientid=alive 在 192.168.1.1 订阅will主题

mosquitto_sub -t test -i alive -q 2 -h 192.168.1.1

当断开client sub端与Server端连接,alive收到offline的消息。

参数说明:

--will-payload : payload for the client Will, which is sent by the broker in case of
                  unexpected disconnection. If not given and will-topic is set, a zero
                  length message will be sent.
 --will-qos : QoS level for the client Will.
 --will-retain : if given, make the client Will retained.
 --will-topic : the topic on which to publish the client Will.

-t : mqtt topic to subscribe to. May be repeated multiple times.
-i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.
-h : mqtt host to connect to. Defaults to localhost.

使用mosquitto c++ 实现publish发布消息到broker

file mymosq.h

class myMosq : public mosqpp::mosquittopp
{
private:
 const char     *     host;
 const char    *     id;
 const char    *     topic;
 int                port;
 int                keepalive;

 void on_connect(int rc);
 void on_disconnect(int rc);
 void on_publish(int mid);
public:
 myMosq(const char *id, const char * _topic, const char *host, int port);
 ~myMosq();
 bool send_message(const char * _message);
};

file mymosq.cpp

#include "mymosq.h"
#include <iostream>
myMosq::myMosq(const char * _id,const char * _topic, const char * _host, int _port) : mosquittopp(_id)
 {
 mosqpp::lib_init();        
 this->keepalive = 60;    
 this->id = _id;
 this->port = _port;
 this->host = _host;
 this->topic = _topic;
 connect_async(host,     
 port,
 keepalive);
 loop_start();            
 };
myMosq::~myMosq() {
 loop_stop();            
 mosqpp::lib_cleanup();    
 }
bool myMosq::send_message(const  char * _message)
 {
 int ret = publish(NULL,this->topic,strlen(_message),_message,1,false);
 return ( ret == MOSQ_ERR_SUCCESS );
 }
void myMosq::on_disconnect(int rc) {
 std::cout << ">> myMosq - disconnection(" << rc << ")" << std::endl;
 }
void myMosq::on_connect(int rc)
 {
 if ( rc == 0 ) {
 std::cout << ">> myMosq - connected with server" << std::endl;
 } else {
 std::cout << ">> myMosq - Impossible to connect with server(" << rc << ")" << std::endl;
 }
 }
void myMosq::on_publish(int mid)
 {
 std::cout << ">> myMosq - Message (" << mid << ") succeed to be published " << std::endl;
 }

此处使用同步连接模式来发布消息,mosquitto有同步和异步两种通讯方式。同步的方式是通信+等待的阻塞模式。
官方手册api地址:https://mosquitto.org/api/files/mosquitto-h.html#mosquitto_connect
有疑问的地方,可以直接看官方手册,比较好懂。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mosquitto.h"

#define HOST "localhost"
#define PORT  1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE  512

static int running = 1;

void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
        printf("Call the function: my_connect_callback\n");

}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
        printf("Call the function: my_disconnect_callback\n");
        running = 0;
}

void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
        printf("Call the function: my_publish_callback\n");

}


int main()
{
        int ret;
        struct mosquitto *mosq;
        char buff[MSG_MAX_SIZE];
        
        //初始化libmosquitto库
        ret = mosquitto_lib_init();
        if(ret){
                printf("Init lib error!\n");
                return -1;
        }
        //创建一个发布端实例
        mosq =  mosquitto_new("pub_test", true, NULL);
        if(mosq == NULL){
                printf("New pub_test error!\n");
                mosquitto_lib_cleanup();
                return -1;
        }

        //设置回调函数
        mosquitto_connect_callback_set(mosq, my_connect_callback);
        mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
        mosquitto_publish_callback_set(mosq, my_publish_callback);

        // 连接至服务器
        // 参数:句柄、ip(host)、端口、心跳
        ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
        if(ret){
                printf("Connect server error!\n");
                mosquitto_destroy(mosq);
                mosquitto_lib_cleanup();
                return -1;
        }

        printf("Start!\n");
        
        //mosquitto_loop_start作用是开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息
        
        int loop = mosquitto_loop_start(mosq); 
        if(loop != MOSQ_ERR_SUCCESS)
        {
            printf("mosquitto loop error\n");
            return 1;
        }


        while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
        {
            /*发布消息*/
            mosquitto_publish(mosq,NULL,"topic1",strlen(buff)+1,buff,0,0);
            memset(buff,0,sizeof(buff));
        }

        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        printf("End!\n");

        return 0;
}