服务器端要求双向验证

mosquitto -c etc/mosquitto.conf

使用sub订阅时出现错误

mosquitto_sub -L "mqtts://localhost:8883/const.net.cn" --cafile ca.crt -d

Client (null) sending CONNECT
Error: host name verification failed.
OpenSSL Error[0]: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed
Error: A TLS error occurred.

在服务器端看到标题所示提示信息
1626420060: New connection from 127.0.0.1:37766 on port 8883.
1626420060: OpenSSL Error[0]: error:14094438:SSL routines:ssl3_read_bytes:tlsv1 alert internal error
1626420060: Client <unknown> disconnected: Protocol error.
1626420068: New connection from 127.0.0.1:37768 on port 8883.
1626420068: OpenSSL Error[0]: error:14094438:SSL routines:ssl3_read_bytes:tlsv1 alert internal error
1626420068: Client <unknown> disconnected: Protocol error.

修改sub订阅中,加上--insecure,错误信息就变了

mosquitto_sub -L "mqtts://localhost:8883/const.net.cn" --cafile ca.crt -d --insecure

Client (null) sending CONNECT
OpenSSL Error[0]: error:1409445C:SSL routines:ssl3_read_bytes:tlsv13 alert certificate required
Error: The connection was lost.
服务器端日志
1626420113: New connection from 127.0.0.1:37770 on port 8883.
1626420113: OpenSSL Error[0]: error:1417C0C7:SSL routines:tls_process_client_certificate:peer did not return a certificate
1626420113: Client <unknown> disconnected: Protocol error.

这个是提示没有提供客户端的证书,因为服务器采取的双向认证嘛。

加上客户端证书与密钥的命令

mosquitto_sub -L "mqtts://localhost:8883/const.net.cn" --cafile ca.crt --key client.key --cert client.crt -d --insecure

Client (null) sending CONNECT
Client (null) received CONNACK (0)
Client (null) sending SUBSCRIBE (Mid: 1, Topic: const.net.cn, QoS: 0, Options: 0x00)
Client (null) received SUBACK
Subscribed (mid: 1): 0
完美解决问题。

https://snowyang.com/2020/12/11/Network/MQTT/mosquitto/也有提到这个问题。作者还推荐了一个单片机的实现mqtts的方案(mbedtls+lwip+mqtt)
https://mcuoneclipse.com/2017/04/23/tuturial-mbedtls-sll-certificate-verification-with-mosquitto-lwip-and-mqtt/

最后给上参数说明

--insecure : do not check that the server certificate hostname matches the remote
              hostname. Using this option means that you cannot be sure that the
              remote host is the server you wish to connect to and so is insecure.
              Do not use this option in a production environment.

下面是一个Will Message的示例:

Sub端clientid=sub预定义遗嘱消息:

mosquitto_sub --will-topic test --will-payload offline --will-qos 2 -t topic -i sub -h 192.168.1.1

客户端 clientid=alive 在 192.168.1.1 订阅will主题

mosquitto_sub -t test -i alive -q 2 -h 192.168.1.1

当断开client sub端与Server端连接,alive收到offline的消息。

参数说明:

--will-payload : payload for the client Will, which is sent by the broker in case of
                  unexpected disconnection. If not given and will-topic is set, a zero
                  length message will be sent.
 --will-qos : QoS level for the client Will.
 --will-retain : if given, make the client Will retained.
 --will-topic : the topic on which to publish the client Will.

-t : mqtt topic to subscribe to. May be repeated multiple times.
-i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.
-h : mqtt host to connect to. Defaults to localhost.

使用mosquitto c++ 实现publish发布消息到broker

file mymosq.h

class myMosq : public mosqpp::mosquittopp
{
private:
 const char     *     host;
 const char    *     id;
 const char    *     topic;
 int                port;
 int                keepalive;

 void on_connect(int rc);
 void on_disconnect(int rc);
 void on_publish(int mid);
public:
 myMosq(const char *id, const char * _topic, const char *host, int port);
 ~myMosq();
 bool send_message(const char * _message);
};

file mymosq.cpp

#include "mymosq.h"
#include <iostream>
myMosq::myMosq(const char * _id,const char * _topic, const char * _host, int _port) : mosquittopp(_id)
 {
 mosqpp::lib_init();        
 this->keepalive = 60;    
 this->id = _id;
 this->port = _port;
 this->host = _host;
 this->topic = _topic;
 connect_async(host,     
 port,
 keepalive);
 loop_start();            
 };
myMosq::~myMosq() {
 loop_stop();            
 mosqpp::lib_cleanup();    
 }
bool myMosq::send_message(const  char * _message)
 {
 int ret = publish(NULL,this->topic,strlen(_message),_message,1,false);
 return ( ret == MOSQ_ERR_SUCCESS );
 }
void myMosq::on_disconnect(int rc) {
 std::cout << ">> myMosq - disconnection(" << rc << ")" << std::endl;
 }
void myMosq::on_connect(int rc)
 {
 if ( rc == 0 ) {
 std::cout << ">> myMosq - connected with server" << std::endl;
 } else {
 std::cout << ">> myMosq - Impossible to connect with server(" << rc << ")" << std::endl;
 }
 }
void myMosq::on_publish(int mid)
 {
 std::cout << ">> myMosq - Message (" << mid << ") succeed to be published " << std::endl;
 }

此处使用同步连接模式来发布消息,mosquitto有同步和异步两种通讯方式。同步的方式是通信+等待的阻塞模式。
官方手册api地址:https://mosquitto.org/api/files/mosquitto-h.html#mosquitto_connect
有疑问的地方,可以直接看官方手册,比较好懂。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mosquitto.h"

#define HOST "localhost"
#define PORT  1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE  512

static int running = 1;

void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
        printf("Call the function: my_connect_callback\n");

}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
        printf("Call the function: my_disconnect_callback\n");
        running = 0;
}

void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
        printf("Call the function: my_publish_callback\n");

}


int main()
{
        int ret;
        struct mosquitto *mosq;
        char buff[MSG_MAX_SIZE];
        
        //初始化libmosquitto库
        ret = mosquitto_lib_init();
        if(ret){
                printf("Init lib error!\n");
                return -1;
        }
        //创建一个发布端实例
        mosq =  mosquitto_new("pub_test", true, NULL);
        if(mosq == NULL){
                printf("New pub_test error!\n");
                mosquitto_lib_cleanup();
                return -1;
        }

        //设置回调函数
        mosquitto_connect_callback_set(mosq, my_connect_callback);
        mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
        mosquitto_publish_callback_set(mosq, my_publish_callback);

        // 连接至服务器
        // 参数:句柄、ip(host)、端口、心跳
        ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
        if(ret){
                printf("Connect server error!\n");
                mosquitto_destroy(mosq);
                mosquitto_lib_cleanup();
                return -1;
        }

        printf("Start!\n");
        
        //mosquitto_loop_start作用是开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息
        
        int loop = mosquitto_loop_start(mosq); 
        if(loop != MOSQ_ERR_SUCCESS)
        {
            printf("mosquitto loop error\n");
            return 1;
        }


        while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
        {
            /*发布消息*/
            mosquitto_publish(mosq,NULL,"topic1",strlen(buff)+1,buff,0,0);
            memset(buff,0,sizeof(buff));
        }

        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        printf("End!\n");

        return 0;
}

mosquitto有同步和异步两种通讯方式。同步的方式是通信+等待的阻塞模式。
官方手册api地址:https://mosquitto.org/api/files/mosquitto-h.html#mosquitto_connect
有疑问的地方,可以直接看官方手册,比较好懂。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mosquitto.h"

#define HOST "localhost"
#define PORT  1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE  512

// 定义运行标志决定是否需要结束
static int running = 1;

void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
        printf("Call the function: on_connect\n");

        if(rc){
                // 连接错误,退出程序
                printf("on_connect error!\n");
                exit(1);
        }else{
                // 订阅主题
                // 参数:句柄、id、订阅的主题、qos
                if(mosquitto_subscribe(mosq, NULL, "topic1", 2)){
                        printf("Set the topic error!\n");
                        exit(1);
                }
        }
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
        printf("Call the function: my_disconnect_callback\n");
        running = 0;
}

void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
        printf("Call the function: on_subscribe\n");
}

void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{
        printf("Call the function: on_message\n");
        printf("Recieve a message of %s : %s\n", (char *)msg->topic, (char *)msg->payload);

        if(0 == strcmp(msg->payload, "quit")){
                mosquitto_disconnect(mosq);
        }
}


int main()
{
        int ret;
        struct mosquitto *mosq;

        // 初始化mosquitto库
        ret = mosquitto_lib_init();
        if(ret){
                printf("Init lib error!\n");
                return -1;
        }

        // 创建一个订阅端实例
        // 参数:id(不需要则为NULL)、clean_start、用户数据
        mosq =  mosquitto_new("sub_test", true, NULL);
        if(mosq == NULL){
                printf("New sub_test error!\n");
                mosquitto_lib_cleanup();
                return -1;
        }

        // 设置回调函数
        // 参数:句柄、回调函数
        mosquitto_connect_callback_set(mosq, my_connect_callback);
        mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
        mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
        mosquitto_message_callback_set(mosq, my_message_callback);

        // 连接至服务器
        // 参数:句柄、ip(host)、端口、心跳
       ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
        if(ret){
                printf("Connect server error!\n");
                mosquitto_destroy(mosq);
                mosquitto_lib_cleanup();
                return -1;
        }


         // 开始通信:循环执行、直到运行标志running被改变
        printf("Start!\n");
        while(running)
        {
                mosquitto_loop(mosq, -1, 1);
        }

        // 结束后的清理工作
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        printf("End!\n");

        return 0;
}