分类 MQTT 下的文章

“MQTT消息队列遥测传输(英语:Message Queuing Telemetry Transport)是ISO 标准(ISO/IEC PRF 20922)下基于发布 (Publish)/订阅 (Subscribe)范式的消息协议,可视为“资料传递的桥梁”它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。”

将二进制数据放入文件中,然后将其作为消息发送:

mosquitto_pub -t test -f file

使用test.mosquitto.org测试数据发送

mosquitto_pub -L mqtt://test.mosquitto.org:1883/LSTS/debug -f doc/buf.bin -d

Client mosq-ZRfknutaDJCvdaPpYI sending CONNECT
Client mosq-ZRfknutaDJCvdaPpYI received CONNACK (0)
Client mosq-ZRfknutaDJCvdaPpYI sending PUBLISH (d0, q0, r0, m1, 'LSTS/debug', ... (35 bytes))
Client mosq-ZRfknutaDJCvdaPpYI sending DISCONNECT

订阅消息

mosquitto_sub -L mqtt://test.mosquitto.org:1883/LSTS/debug -d

Client mosq-Jrrv1j2it7SnqvdTVa sending CONNECT
Client mosq-Jrrv1j2it7SnqvdTVa received CONNACK (0)
Client mosq-Jrrv1j2it7SnqvdTVa sending SUBSCRIBE (Mid: 1, Topic: LSTS/debug, QoS: 0, Options: 0x00)
Client mosq-Jrrv1j2it7SnqvdTVa received SUBACK
Subscribed (mid: 1): 0
Client mosq-Jrrv1j2it7SnqvdTVa received PUBLISH (d0, q0, r0, m0, 'LSTS/debug', ... (35 bytes))

package main

import (
    "fmt"
    "os"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
    Info.Printf("Client connected: %t\n", client.IsConnected())
    // 订阅主题
    if token := client.Subscribe("const.net.cn/#", 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }
}

var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    Info.Printf("Topic: %s\n", msg.Topic())
    Info.Printf("Message: %s\n", msg.Payload())
}

var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    Info.Println("Client disconnected")
}
var onReconnect mqtt.ReconnectHandler = func(c mqtt.Client, co *mqtt.ClientOptions) {
    Info.Println("Client Reconnect.")
}
    
func MQTT_1883_unencrypted_unauthenticated() {
    opts := mqtt.NewClientOptions().AddBroker("tcp://test.mosquitto.org:1883").SetClientID("const.net.cn")

    opts.SetKeepAlive(60 * time.Second)
    // 设置消息回调处理函数
    opts.SetDefaultPublishHandler(onMessage)
    opts.SetCleanSession(true)
    opts.SetAutoReconnect(true)
    opts.SetProtocolVersion(4) //3.1.1
    opts.SetOnConnectHandler(onConnect)
    opts.SetConnectionLostHandler(onDisconnect)
    opts.SetReconnectingHandler(onReconnect)

    opts.SetPingTimeout(1 * time.Second)

    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    time.Sleep(5 * time.Second)
    // 发布消息
    Info.Printf("MQTT Publish")
    token := c.Publish("const.net.cn/1", 0, false, "Hello World")
    token.Wait()

    for {
        Info.Printf("wait.")
        time.Sleep(10 * time.Second)
    }
}

运行结果:
INFO: 2021/09/06 15:05:08.897152 Client connected: true
INFO: 2021/09/06 15:05:13.899218 MQTT Publish
INFO: 2021/09/06 15:05:13.899298 wait.
INFO: 2021/09/06 15:05:15.040248 Topic: const.net.cn/1
INFO: 2021/09/06 15:05:15.040341 Message: Hello World

用户名密码:rw/readwrite

package main

import (
    "fmt"
    "os"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

var onConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
    Info.Printf("Client connected: %t\n", client.IsConnected())
    // 订阅主题
    if token := client.Subscribe("const.net.cn/#", 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }
}

var onMessage mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    Info.Printf("Topic: %s\n", msg.Topic())
    Info.Printf("Message: %s\n", msg.Payload())
}

var onDisconnect mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    Info.Println("Client disconnected")
}
var onReconnect mqtt.ReconnectHandler = func(c mqtt.Client, co *mqtt.ClientOptions) {
    Info.Println("Client Reconnect.")
}

func MQTT_1884_unencrypted_authenticated() {
    opts := mqtt.NewClientOptions().AddBroker("tcp://test.mosquitto.org:1884").SetClientID("const.net.cn")

    opts.SetKeepAlive(60 * time.Second)
    // 设置消息回调处理函数
    opts.SetDefaultPublishHandler(onMessage)
    opts.SetCleanSession(true)
    opts.SetAutoReconnect(true)
    opts.SetProtocolVersion(4) //3.1.1
    opts.SetOnConnectHandler(onConnect)
    opts.SetConnectionLostHandler(onDisconnect)
    opts.SetReconnectingHandler(onReconnect)
    opts.SetUsername("rw")
    opts.SetPassword("readwrite")

    opts.SetPingTimeout(1 * time.Second)

    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    time.Sleep(5 * time.Second)
    // 发布消息
    Info.Printf("MQTT Publish")
    token := c.Publish("const.net.cn/1", 0, false, "Hello World")
    token.Wait()

    for {
        Info.Printf("wait.")
        time.Sleep(10 * time.Second)
    }
}

运行结果:
INFO: 2021/09/06 15:10:39.721974 Client connected: true
INFO: 2021/09/06 15:10:44.723829 MQTT Publish
INFO: 2021/09/06 15:10:44.724246 wait.
INFO: 2021/09/06 15:10:45.317469 Topic: const.net.cn/1
INFO: 2021/09/06 15:10:45.317498 Message: Hello World
INFO: 2021/09/06 15:10:54.725207 wait.
INFO: 2021/09/06 15:11:04.725945 wait.
INFO: 2021/09/06 15:11:14.726876 wait.
INFO: 2021/09/06 15:11:24.727531 wait.

rabbitmq之一概念解释(信道、交换器和路由键、队列)
update:2021-9-9

一、 channel 信道:

  概念:信道是生产消费者与rabbit通信的渠道,生产者publish或是消费者subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID
,保证了信道私有性,对应上唯一的线程使用。

       疑问:为什么不建立多个TCP连接呢?原因是rabbit保证性能,系统为每个线程开辟一个TCP是非常消耗性能,每秒成百上千的建立销毁TCP会严重消耗系统。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接)连接到rabbit上。

     类似概念:TCP是电缆,信道就是里面的光纤,每个光纤都是独立的,互不影响。

二、exchange 交换机和绑定routing key

  exchange的作用就是类似路由器,routing key 就是路由键,服务器会根据路由键将消息从交换器路由到队列上去。

  exchange有多个种类:direct,fanout,topic,header(非路由键匹配,功能和direct类似,很少用)。前三种类似集合对应关系那样,(direct)1:1,(fanout)1:N,(topic)N:1

  direct: 1:1类似完全匹配

  fanout:1:N 
可以把一个消息并行发布到多个队列上去,简单的说就是,当多个队列绑定到fanout的交换器,那么交换器一次性拷贝多个消息分别发送到绑定的队列上,每个队列有这个消息的副本。

     ps:这个可以在业务上实现并行处理多个任务,比如,用户上传图片功能,当消息到达交换器上,它可以同时路由到积分增加队列和其它队列上,达到并行处理的目的,并且易扩展,以后有什么并行任务的时候,直接绑定到fanout交换器不需求改动之前的代码。

  topic   N:1 ,多个交换器可以路由消息到同一个队列。根据模糊匹配,比如一个队列的routing key
为*.test ,那么凡是到达交换器的消息中的routing key 后缀.test都被路由到这个队列上。

三、结合信道、交换器和路由键到队列

  总结几点重要知识:1.信道才是rabbit通信本质,生产者和消费者都是通过信道完成消息生产消费的。2.交换器本质是一张路由查询表(名称和队列id,类似于hash表),这是一个虚拟出来的东西,并不存在真实的交换器。

  消息的生命周期:生产者生产消息A
交由信道,信道通过消息(消息由载体和标签)的标签(路由键)放到交换器发送到队列上(其实就是查询匹配,一旦匹配到了规则,信道就直接和队列产生连接,然后将消息发送过去)

Referenced from:https://www.huaweicloud.com/articles/12567586.html

RabbitMQ入门:路由(Routing) update:2021-9-9
fanout类型的exchange,它是一种通过广播方式发送消息的路由器,所有和exchange建立的绑定关系的队列都会接收到消息。但是有一些场景只需要订阅到一部分消息,这个时候就不能使用fanout
类型的exchange了

Direct Exchange(直接路由器) Direct Exchange,通过Routing
Key来决定需要将消息发送到哪个或者哪些队列中。 它是一种完全按照routing key(路由关键字)进行投递的:当消息中的routing
key和队列中的binding key完全匹配时,才进行会将消息投递到该队列中。

Referenced from:https://www.cnblogs.com/sam-uncle/p/9209666.html