zmq介绍
ØMQ (也拼写作ZeroMQ,0MQ或ZMQ)是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列, 但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字风格的API。
ZeroMQ是由iMatix公司和大量贡献者组成的社群共同开发的。 ZMQ 是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核。
libzmq 下载地址
https://github.com/zeromq/libzmq/archive/refs/tags/v4.3.4.tar.gz
zmq github地址
https://github.com/zeromq/libzmq/
cppzmq github地址
https://github.com/zeromq/cppzmq
cppzmq 下载地址
https://github.com/zeromq/cppzmq/archive/refs/tags/v4.8.1.tar.gz
cppzmq 介绍
cppzmq is a C++ binding for libzmq. It has the following design goals:
cppzmq maps the libzmq C API to C++ concepts. In particular:
it is type-safe (the libzmq C API exposes various class-like concepts as void*)
it provides exception-based error handling (the libzmq C API provides errno-based error handling)
it provides RAII-style classes that automate resource management (the libzmq C API requires the user to take care to free resources explicitly)
cppzmq is a light-weight, header-only binding. You only need to include the header file zmq.hpp (and maybe zmq_addon.hpp) to use it.
zmq.hpp is meant to contain direct mappings of the abstractions provided by the libzmq C API, while zmq_addon.hpp provides additional higher-level abstractions.
ubuntu 下编译 libzmq
wget https://github.com/zeromq/libzmq/archive/refs/tags/v4.3.4.tar.gz
tar xvf v4.3.4.tar.gz
cd libzmq-4.3.4/
./autogen.sh
mkdir build && cd build
cmake ..
make
cmake的时候提示没有libsodium,安装就好。
sudo apt install libsodium-dev
指定安装路径
cmake -DCMAKE_INSTALL_PREFIX=$(pwd)/../../install.x64 ..
make install 即可。
ubuntu 下编译cppzmq
wget https://github.com/zeromq/cppzmq/archive/refs/tags/v4.8.1.tar.gz
tar xvf v4.8.1.tar.gz
cd cppzmq-4.8.1/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=$(pwd)/../../install.x64 ..
make install
Zmq通信场景
- 线程之间(inproc)
- 进程之间(ipc)
- 机器之间(tcp)
Zmq通信模式
- 请求-回复(Request-reply)。分为ZMQ_REQ、ZMQ_REP、ZMQ_DEALER、ZMQ_ROUTER
REQ-REP模式是阻塞式的,也就是说必须要client先发送一条消息给server,然后server才可以返回一个response给client。任何顺序上的错误都会导致报错。
a) 服务端和客户端无论谁先启动,效果是相同的,这点不同于Socket。
b) 在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。
c) 服务端收到信息以后,会send一个“World”给客户端。值得注意的是一定是client连接上来以后,send消息给Server,然后Server再rev然后响应client,这种一问一答式的。如果Server先send,client先rev是会报错的。
d) ZMQ通信通信单元是消息,他除了知道Bytes的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等等。
- 发布-订阅(Publish-subscribe)。分为ZMQ_PUB、ZMQ_SUB
发布者只是绑定了端口,并进行信息发布,其并不care是否有接收者,有哪些接收者。
要说明的两点就是:
- 服务器端一直不断的广播中,如果中途有 Subscriber 端退出,并不影响他继续的广播,当 Subscriber 再连接上来的时候,收到的就是后来发送的新的信息了。这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题,所谓的 Slow joiner。
注意这个slow joiner问题,之后会为了解决这个问题而设计新的模式。
2.但是,如果 Publisher 中途离开,所有的 Subscriber 会 hold 住,等待 Publisher 再上线的时候,会继续接受信息。
- 管道(Pipeline)。分为ZMQ_PUSH、ZMQ_PULL
管道模式(Pipeline) 这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine 比较适合于这种场景。Pipeline的原理就是:有一个Publisher来发布任务,这些任务是可以平行执行的。有一批Worker用于接收任务,Worker处理完任务之后就将结果发送到Sink之中用于归总或进一步处理。
所以要明确的是Pipeline之中并不是服务器,客户端的关系了,而是有三种对象——Ventilator,Worker,Sink
task ventilator 使用的是bind SOCKET_PUSH,将任务分发到 Worker 节点上。而 Worker 节点上,使用 connect SOCKET_PULL 从上游接受任务,并使用connect SOCKET_PUSH 将结果汇集到bind Slink。
- 对立对(Exclusive pair)。分为ZMQ_PAIR
libzmq 交叉编译
./configure --host=arm-linux-gnueabihf --without-libsodium --prefix=$(pwd)/../install.arm --disable-libbsd
zmq_proxy_steerable 说明
int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);
第四个参数“control”。如果控制socket不为NULL,这个代理支持后续的控制操作。如果这个socket接收到SUSPEND0消息,此代理将延迟它的活动。如果接到了RESUME0消息,它将继续工作。如果收到了TERMINATE0消息,它将平滑的(smoothly)结束。如果接到了STATISTICS0消息 代理将回复控制套接字,发送一个8帧的多部分消息,每个消息具有64位的无符号整数,其顺序如下:前端套接字接收的消息数===>前端套接字接收的字节数===>发送给前端套接字的消息数量===>前端套接字发送的字节数===>后端套接字接收的消息数===>后端套接字接收的字节数===>发送后端套接字的消息数===>发送后端套接字的字节数。
如果控制socket为NULL,此函数和zmq_proxy的工作方式一样。
当控制socket接收到TERMINATE时zmq_proxy_steerable()函数返回0,。否则,返回 -1,并且设置errno为ETERM。
创建一个共享的代理队列
// Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_SUB);
assert (control);
// Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
// Subscribe to the control socket since we have chosen SUB here
assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
// Start the queue proxy, which runs until ETERM or "TERMINATE" received on the control socket zmq_proxy (frontend, backend, NULL, control);
在另一个节点上创建一个控制器,进程或者其它
void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
assert (zmq_bind (control, "tcp://*:5557") == 0);
// stop the proxy
assert (zmq_send (control, "STOP", 5, 0) == 0);
// resume the proxy
assert (zmq_send (control, "RESUME", 7, 0) == 0);
// terminate the proxy
assert (zmq_send (control, "TERMINATE", 10, 0) == 0);
zmq router dealer 介绍
“请求-响应”代理使用ZMQ_ROUTER、ZMQ_DEALER
ZMQ_DEALER
ZMQ_DEALER类型的套接字是用于扩展“请求/应答”套接字的高级模式
发送消息时:当ZMQ_DEALER套接字由于已达到所有对等点的最高水位而进入静音状态时,或者如果根本没有任何对等点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或至少一个对等方变得可以发送;消息不会被丢弃
接收消息时:发送的每条消息都是在所有连接的对等方之间进行轮询,并且收到的每条消息都是从所有连接的对等方进行公平排队的
将ZMQ_DEALER套接字连接到ZMQ_REP套接字时,发送的每个消息都必须包含一个空的消息部分,定界符以及一个或多个主体部分
REQ和ROUTER交流,DEALER与REP交流。
我们的代理必须是非阻塞的,可以使用zmq_poll()来轮询任何一个套接字上的活动,但我们不能使用REQ-REQ。幸运地是,有两个称为DEALER和ROUTER的套接字,它们能我们可以执行无阻塞的请求-响应
客户端的流程如下:将REQ套接字连接到代理的ROUTER节点上,向ROUTER节点发送“Hello”,接收到“World”的回复
服务端的流程如下:将REP套接字连接到代理的DEALER节点上
代理端的流程如下:
创建一个ROUTER套接字与客户端相连接,创建一个DEALER套接字与服务端相连接
ROUTER套接字从客户端接收请求数据,并把请求数据发送给服务端
DEALER套接字从服务端接收响应数据,并把响应数据发送给客户端
客户端的消息是有顺序到达客户端的,消息会自动进行排队
ZMQ_ROUTER 兼容的对等套接字 ZMQ_DEALER、ZMQ_REQ、ZMQ_ROUTER
ZMQ_DEALER 兼容的对等套接字 ZMQ_ROUTER、ZMQ_REP、ZMQ_DEALER
“请求-响应模型”支持的套接字类型有4种:
ZMQ_REP
ZMQ_REQ
ZMQ_DEALER
ZMQ_ROUTER
php zmq
安装 sudo apt install php-zm
php zmq 示例
<?php
/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_PUB);
$queue->connect("tcp://127.0.0.1:5554");
while (1) {
$queue->send("hello there!");
}
php zmq req示例
<?php
/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, "MySock1");
/* Connect to an endpoint */
$queue->connect("tcp://127.0.0.1:5555");
/* send and receive */
var_dump($queue->send("hello there, using socket 1")->recv());
?>
php zmq req 非阻塞示例
<?php
/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, "MySock1");
$queue->connect("tcp://127.0.0.1:5555");
/* Assign socket 1 to the queue, send and receive */
$retries = 5;
$sending = true;
$sent = false;
$queue->setSockOpt (ZMQ::SOCKOPT_LINGER, 1000);
echo "Trying to send message\n";
do {
try {
if ($sending) {
if ($queue->send("This is a message", ZMQ::MODE_DONTWAIT) !== false) {
echo "Message sent\n";
$sent = true;
$sending = false;
}
}
} catch (ZMQSocketException $e) {
die(" - Error: " . $e->getMessage());
}
usleep (1000);
} while (1 && --$retries);
$retries = 2;
$receiving = true;
$received = false;
echo "Trying to receive message\n";
do {
try {
if ($receiving) {
$message = $queue->recv (ZMQ::MODE_DONTWAIT);
if ($message) {
echo "Received message: " . $message . "\n";
$receiving = false;
$received = true;
}
}
} catch (ZMQSocketException $e) {
die(" - Error: " . $e->getMessage());
}
sleep (1);
} while (1 && --$retries);
if (!$received) {
echo "The receive timed out\n";
}
?>
更多php-zmq示例
https://github.com/zeromq/php-zmq/tree/master/examples
ZeroMQ设置超时等待
设置一下超时等待,避免阻塞。
使用方法:
int timeout = 1000;
zmq_setsockopt (requester, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
zmq_setsockopt (requester, ZMQ_SNDTIMEO, &timeout, sizeof(timeout));
php zmq pipe line 示例
https://www.cnblogs.com/jkko123/p/6294575.html
pipeline中,send.php通过把累加任务分发给50个worker节点计算,然后worker节点计算完成后,把结果发送给result.php进行统一的汇总。
sender:
$sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sed->bind("tcp://127.0.0.1:8881");
worker:
$rev = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$rev->connect("tcp://127.0.0.1:8881");
$sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sed->connect("tcp://127.0.0.1:8882");
result:
$rev = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$rev->bind("tcp://127.0.0.1:8882");
C/C++使用ZeroMQ的Router/Dealer模式搭建高性能后台服务框架
ROUTER/DEALER的优点
没错,就是简单使用ZeroMQ提供的ROUTER/DEALER组合模式,可以轻松搭建一个高性能异步的C/C++后台服务框架。ROUTER可 以高效的接收客户端的请求,而DEALER可以负载均衡的调度后端Worker工作。当客户端的请求特别多,后端Worker处理不过来,需要增加 Worker的时候,也非常简单,新加入的Worker直接Connect到DEALER即可。如此运维起来也非常高效,后端可以非常简单的横向扩展!值的一提的是,ROUTER又叫做XREP,DEALER又叫做XREQ。
Dealer将后端Worker的应答数据转发到Router。
然后由Router寻址将应答数据准确的传递给对应的client。
值得注意的是,Router对client的寻址方式,得看client的‘身份’。
临时身份的client,Router会为其生成一个uuid进行标识。
永久身份的client,Router直接使用该client的身份。
ZMQ_REP套接字,其实就是一个“应答”,即,把应答数据回复给ZMQ_REQ,他们是严格的一问一答的方式。不过组合上 ZMQ_ROUTER,ZMQ_DEALER模式后,后台Worker不再是服务的死穴,可以通过横向扩展多个Worker来提高处理ZMQ_REQ的能 力。
ZMQ_REQ是“问”的套接字,它需要“答”套接字(ZMQ_REP)。不过目前ZMQ_REP已经被 ROUTER/DEALER华丽的包装成高富帅了。ZMQ_REQ只能通过联系ROUTER,然后由这个ROUTER/DEALER组成的DEVICE帮 忙传递“爱意”到达ZMQ_REP了,然后再默默的期待ROUTER传递ZMQ_REQ的“答复”。
ZeroMQ的ROUTER/DEALER模式
1、客户端(ZMQ_REQ)发送请求到ROUTER后,ROUTER是会对客户端进行身份表示的,正式因为有这个身份标示,所以ROUTER才有能力正确的把应答数据准确的传递到来源的客户端。
现在可以回答一下上文的一个思考题了—-ROUTER传递的3帧数据到底是什么数据:
A、第一帧是ROUTER自己加上的消息,这个是ROUTER对ZMQ_REQ所做的一个身份标识。说到身份标识,这里就引入到两种套接字。
一种叫做临时套接字,另外一种叫做永久套接字,他们的区别仅仅是是否使用ZMQ_IDENTITY。
没使用的即默认为临时套接字,我的这个文章里面的例子就是一个临时套接字。对于临时套接字,ROUTER会为它生成一个唯一的UUID,所以可以看到第一帧的长度为5,正是这个UUID。
而使用如下方式设定的套接字,则称为永久套接字。如果这样设置,那第一帧收到的消息长度就是13,而ROUTER也会直接使用www.example.com这个身份来标识这个客户端。
zmq_setsockopt(req, ZMQ_IDENTITY, “www.example.com”, 13);
B、第二帧是一个空帧,这是由REQ加上去的。可以理解为一个分隔符,当ROUTER遇到这个空帧后,就知道下一帧就是真正的请求数据了,这在多种组合模型里面尤其有用。
C、第三帧显然就是真正的请求数据了。这里的例子比较简单,复杂的例子,客户端可能会通过ZMQ_SNDMORE来发送多帧数据。如果是这样,那ROUTER还会继续收到第四帧,第五帧。。。数据的。
2、REQ到达ROUTER,ROUTER会公平排队,并公平的将REQ发往后端。但当后端能力不足的时候,导致ROUTER派发太慢的时候,ROUTER进入高水位的时候,ROUTER会丢弃掉消息的。所以这个得注意监控后台服务的性能。
3、DEALER会负载均衡的将任务派发后连接到它的各个工作Worker。
https://hottaro.com/index.php?document_srl=256&mid=Framework