标签 zmq 下的文章

“”

zmq介绍
ØMQ (也拼写作ZeroMQ,0MQ或ZMQ)是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列, 但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字风格的API。

ZeroMQ是由iMatix公司和大量贡献者组成的社群共同开发的。 ZMQ 是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核。

libzmq 下载地址

https://github.com/zeromq/libzmq/archive/refs/tags/v4.3.4.tar.gz

zmq github地址

https://github.com/zeromq/libzmq/

cppzmq github地址

https://github.com/zeromq/cppzmq

cppzmq 下载地址

https://github.com/zeromq/cppzmq/archive/refs/tags/v4.8.1.tar.gz

cppzmq 介绍

cppzmq is a C++ binding for libzmq. It has the following design goals:

cppzmq maps the libzmq C API to C++ concepts. In particular:
it is type-safe (the libzmq C API exposes various class-like concepts as void*)
it provides exception-based error handling (the libzmq C API provides errno-based error handling)
it provides RAII-style classes that automate resource management (the libzmq C API requires the user to take care to free resources explicitly)
cppzmq is a light-weight, header-only binding. You only need to include the header file zmq.hpp (and maybe zmq_addon.hpp) to use it.
zmq.hpp is meant to contain direct mappings of the abstractions provided by the libzmq C API, while zmq_addon.hpp provides additional higher-level abstractions.

ubuntu 下编译 libzmq

wget https://github.com/zeromq/libzmq/archive/refs/tags/v4.3.4.tar.gz
tar xvf v4.3.4.tar.gz
cd libzmq-4.3.4/
./autogen.sh
mkdir build && cd build
cmake ..
make

cmake的时候提示没有libsodium,安装就好。

sudo apt install libsodium-dev

指定安装路径

cmake -DCMAKE_INSTALL_PREFIX=$(pwd)/../../install.x64 ..

make install 即可。

ubuntu 下编译cppzmq

wget https://github.com/zeromq/cppzmq/archive/refs/tags/v4.8.1.tar.gz
tar xvf v4.8.1.tar.gz
cd cppzmq-4.8.1/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=$(pwd)/../../install.x64 ..
make install

Zmq通信场景

  • 线程之间(inproc)
  • 进程之间(ipc)
  • 机器之间(tcp)

Zmq通信模式

  • 请求-回复(Request-reply)。分为ZMQ_REQ、ZMQ_REP、ZMQ_DEALER、ZMQ_ROUTER
    REQ-REP模式是阻塞式的,也就是说必须要client先发送一条消息给server,然后server才可以返回一个response给client。任何顺序上的错误都会导致报错。

a) 服务端和客户端无论谁先启动,效果是相同的,这点不同于Socket。

b) 在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。

c) 服务端收到信息以后,会send一个“World”给客户端。值得注意的是一定是client连接上来以后,send消息给Server,然后Server再rev然后响应client,这种一问一答式的。如果Server先send,client先rev是会报错的。

d) ZMQ通信通信单元是消息,他除了知道Bytes的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等等。

  • 发布-订阅(Publish-subscribe)。分为ZMQ_PUB、ZMQ_SUB
    发布者只是绑定了端口,并进行信息发布,其并不care是否有接收者,有哪些接收者。

要说明的两点就是:

  1. 服务器端一直不断的广播中,如果中途有 Subscriber 端退出,并不影响他继续的广播,当 Subscriber 再连接上来的时候,收到的就是后来发送的新的信息了。这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题,所谓的 Slow joiner。

注意这个slow joiner问题,之后会为了解决这个问题而设计新的模式。

2.但是,如果 Publisher 中途离开,所有的 Subscriber 会 hold 住,等待 Publisher 再上线的时候,会继续接受信息。

  • 管道(Pipeline)。分为ZMQ_PUSH、ZMQ_PULL
    管道模式(Pipeline) 这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine 比较适合于这种场景。Pipeline的原理就是:有一个Publisher来发布任务,这些任务是可以平行执行的。有一批Worker用于接收任务,Worker处理完任务之后就将结果发送到Sink之中用于归总或进一步处理。

所以要明确的是Pipeline之中并不是服务器,客户端的关系了,而是有三种对象——Ventilator,Worker,Sink
task ventilator 使用的是bind SOCKET_PUSH,将任务分发到 Worker 节点上。而 Worker 节点上,使用 connect SOCKET_PULL 从上游接受任务,并使用connect SOCKET_PUSH 将结果汇集到bind Slink。

  • 对立对(Exclusive pair)。分为ZMQ_PAIR

libzmq 交叉编译

./configure --host=arm-linux-gnueabihf --without-libsodium --prefix=$(pwd)/../install.arm --disable-libbsd

zmq_proxy_steerable 说明

int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);

第四个参数“control”。如果控制socket不为NULL,这个代理支持后续的控制操作。如果这个socket接收到SUSPEND0消息,此代理将延迟它的活动。如果接到了RESUME0消息,它将继续工作。如果收到了TERMINATE0消息,它将平滑的(smoothly)结束。如果接到了STATISTICS0消息 代理将回复控制套接字,发送一个8帧的多部分消息,每个消息具有64位的无符号整数,其顺序如下:前端套接字接收的消息数===>前端套接字接收的字节数===>发送给前端套接字的消息数量===>前端套接字发送的字节数===>后端套接字接收的消息数===>后端套接字接收的字节数===>发送后端套接字的消息数===>发送后端套接字的字节数。
如果控制socket为NULL,此函数和zmq_proxy的工作方式一样。
当控制socket接收到TERMINATE时zmq_proxy_steerable()函数返回0,。否则,返回 -1,并且设置errno为ETERM。
创建一个共享的代理队列

//  Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_SUB);
assert (control);
//  Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
// Subscribe to the control socket since we have chosen SUB here
assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
//  Start the queue proxy, which runs until ETERM or "TERMINATE" received on the control socket zmq_proxy (frontend, backend, NULL, control);

在另一个节点上创建一个控制器,进程或者其它

void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
assert (zmq_bind (control, "tcp://*:5557") == 0);
// stop the proxy 
assert (zmq_send (control, "STOP", 5, 0) == 0);
// resume the proxy
assert (zmq_send (control, "RESUME", 7, 0) == 0);
// terminate the proxy
assert (zmq_send (control, "TERMINATE", 10, 0) == 0);

zmq router dealer 介绍

“请求-响应”代理使用ZMQ_ROUTER、ZMQ_DEALER
ZMQ_DEALER
ZMQ_DEALER类型的套接字是用于扩展“请求/应答”套接字的高级模式
发送消息时:当ZMQ_DEALER套接字由于已达到所有对等点的最高水位而进入静音状态时,或者如果根本没有任何对等点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或至少一个对等方变得可以发送;消息不会被丢弃
接收消息时:发送的每条消息都是在所有连接的对等方之间进行轮询,并且收到的每条消息都是从所有连接的对等方进行公平排队的
将ZMQ_DEALER套接字连接到ZMQ_REP套接字时,发送的每个消息都必须包含一个空的消息部分,定界符以及一个或多个主体部分
REQ和ROUTER交流,DEALER与REP交流。

我们的代理必须是非阻塞的,可以使用zmq_poll()来轮询任何一个套接字上的活动,但我们不能使用REQ-REQ。幸运地是,有两个称为DEALER和ROUTER的套接字,它们能我们可以执行无阻塞的请求-响应

客户端的流程如下:将REQ套接字连接到代理的ROUTER节点上,向ROUTER节点发送“Hello”,接收到“World”的回复
服务端的流程如下:将REP套接字连接到代理的DEALER节点上
代理端的流程如下:
创建一个ROUTER套接字与客户端相连接,创建一个DEALER套接字与服务端相连接
ROUTER套接字从客户端接收请求数据,并把请求数据发送给服务端
DEALER套接字从服务端接收响应数据,并把响应数据发送给客户端
客户端的消息是有顺序到达客户端的,消息会自动进行排队

ZMQ_ROUTER 兼容的对等套接字 ZMQ_DEALER、ZMQ_REQ、ZMQ_ROUTER
ZMQ_DEALER 兼容的对等套接字 ZMQ_ROUTER、ZMQ_REP、ZMQ_DEALER

“请求-响应模型”支持的套接字类型有4种:
ZMQ_REP
ZMQ_REQ
ZMQ_DEALER
ZMQ_ROUTER

php zmq

安装 sudo apt install php-zm

php zmq 示例

<?php

/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_PUB);
$queue->connect("tcp://127.0.0.1:5554");

while (1) {
  $queue->send("hello there!");
}

php zmq req示例

<?php

/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, "MySock1");

/* Connect to an endpoint */
$queue->connect("tcp://127.0.0.1:5555");

/* send and receive */
var_dump($queue->send("hello there, using socket 1")->recv());

?>

php zmq req 非阻塞示例

<?php

/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, "MySock1");
$queue->connect("tcp://127.0.0.1:5555");

/* Assign socket 1 to the queue, send and receive */
$retries = 5;
$sending = true;
$sent    = false;

$queue->setSockOpt (ZMQ::SOCKOPT_LINGER, 1000);

echo "Trying to send message\n";
do {
    try {
        if ($sending) {
            if ($queue->send("This is a message", ZMQ::MODE_DONTWAIT) !== false) {
                echo "Message sent\n";
                $sent    = true;
                $sending = false;
            }
        }
    } catch (ZMQSocketException $e) {
        die(" - Error: " . $e->getMessage());
    }
    usleep (1000);
} while (1 && --$retries);

$retries = 2;
$receiving = true;
$received  = false;

echo "Trying to receive message\n";
do {
    try {
        if ($receiving) {
            $message = $queue->recv (ZMQ::MODE_DONTWAIT);
            
            if ($message) {
                echo "Received message: " . $message . "\n";
                $receiving = false;
                $received = true;
            }
        }
    } catch (ZMQSocketException $e) {
        die(" - Error: " . $e->getMessage());
    }
    sleep (1);
} while (1 && --$retries);

if (!$received) {
    echo "The receive timed out\n";
}

?>

更多php-zmq示例
https://github.com/zeromq/php-zmq/tree/master/examples

ZeroMQ设置超时等待

设置一下超时等待,避免阻塞。

使用方法:

int timeout = 1000;
zmq_setsockopt (requester, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
zmq_setsockopt (requester, ZMQ_SNDTIMEO, &timeout, sizeof(timeout));

php zmq pipe line 示例
https://www.cnblogs.com/jkko123/p/6294575.html
pipeline中,send.php通过把累加任务分发给50个worker节点计算,然后worker节点计算完成后,把结果发送给result.php进行统一的汇总。

sender:

$sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sed->bind("tcp://127.0.0.1:8881");

worker:

$rev = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$rev->connect("tcp://127.0.0.1:8881"); 
$sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sed->connect("tcp://127.0.0.1:8882");

result:

$rev = new ZMQSocket($context, ZMQ::SOCKET_PULL); 
$rev->bind("tcp://127.0.0.1:8882");

C/C++使用ZeroMQ的Router/Dealer模式搭建高性能后台服务框架
ROUTER/DEALER的优点
没错,就是简单使用ZeroMQ提供的ROUTER/DEALER组合模式,可以轻松搭建一个高性能异步的C/C++后台服务框架。ROUTER可 以高效的接收客户端的请求,而DEALER可以负载均衡的调度后端Worker工作。当客户端的请求特别多,后端Worker处理不过来,需要增加 Worker的时候,也非常简单,新加入的Worker直接Connect到DEALER即可。如此运维起来也非常高效,后端可以非常简单的横向扩展!值的一提的是,ROUTER又叫做XREP,DEALER又叫做XREQ。

Dealer将后端Worker的应答数据转发到Router。
然后由Router寻址将应答数据准确的传递给对应的client。
值得注意的是,Router对client的寻址方式,得看client的‘身份’。
临时身份的client,Router会为其生成一个uuid进行标识。
永久身份的client,Router直接使用该client的身份。
ZMQ_REP套接字,其实就是一个“应答”,即,把应答数据回复给ZMQ_REQ,他们是严格的一问一答的方式。不过组合上 ZMQ_ROUTER,ZMQ_DEALER模式后,后台Worker不再是服务的死穴,可以通过横向扩展多个Worker来提高处理ZMQ_REQ的能 力。

ZMQ_REQ是“问”的套接字,它需要“答”套接字(ZMQ_REP)。不过目前ZMQ_REP已经被 ROUTER/DEALER华丽的包装成高富帅了。ZMQ_REQ只能通过联系ROUTER,然后由这个ROUTER/DEALER组成的DEVICE帮 忙传递“爱意”到达ZMQ_REP了,然后再默默的期待ROUTER传递ZMQ_REQ的“答复”。

ZeroMQ的ROUTER/DEALER模式

1、客户端(ZMQ_REQ)发送请求到ROUTER后,ROUTER是会对客户端进行身份表示的,正式因为有这个身份标示,所以ROUTER才有能力正确的把应答数据准确的传递到来源的客户端。

现在可以回答一下上文的一个思考题了—-ROUTER传递的3帧数据到底是什么数据:

A、第一帧是ROUTER自己加上的消息,这个是ROUTER对ZMQ_REQ所做的一个身份标识。说到身份标识,这里就引入到两种套接字。

一种叫做临时套接字,另外一种叫做永久套接字,他们的区别仅仅是是否使用ZMQ_IDENTITY。

没使用的即默认为临时套接字,我的这个文章里面的例子就是一个临时套接字。对于临时套接字,ROUTER会为它生成一个唯一的UUID,所以可以看到第一帧的长度为5,正是这个UUID。

而使用如下方式设定的套接字,则称为永久套接字。如果这样设置,那第一帧收到的消息长度就是13,而ROUTER也会直接使用www.example.com这个身份来标识这个客户端。

zmq_setsockopt(req, ZMQ_IDENTITY, “www.example.com”, 13);

B、第二帧是一个空帧,这是由REQ加上去的。可以理解为一个分隔符,当ROUTER遇到这个空帧后,就知道下一帧就是真正的请求数据了,这在多种组合模型里面尤其有用。

C、第三帧显然就是真正的请求数据了。这里的例子比较简单,复杂的例子,客户端可能会通过ZMQ_SNDMORE来发送多帧数据。如果是这样,那ROUTER还会继续收到第四帧,第五帧。。。数据的。

2、REQ到达ROUTER,ROUTER会公平排队,并公平的将REQ发往后端。但当后端能力不足的时候,导致ROUTER派发太慢的时候,ROUTER进入高水位的时候,ROUTER会丢弃掉消息的。所以这个得注意监控后台服务的性能。

3、DEALER会负载均衡的将任务派发后连接到它的各个工作Worker。
https://hottaro.com/index.php?document_srl=256&mid=Framework