分类 Demo 下的文章

“各种示例”

'Re: Implement a watchdog' - MARC
update:2021-11-9

i have a ENDAT 7703 motherboard near my desk,
the manual point out some instruction to configure the
watchdog :

//init, select the destination of command...

outportb(0x2e,0x87);
outportb(0x2e,0x87);
outportb(0x2e,0x07);
outportb(0x2f,0x08);

//com

outportb(0x2e,0x30); //main

outportb(0x2f,0x01); //0 to inactive

outportb(0x2e,0xf2); //reset param

outportb(0x2f,0x00); // bit 6 keyboard, bit 7 mouse

outportb(0x2e,0xf0); //time unit conf
outportb(0x2f,0x04); // bit3 = 1 minutes

outportb(0x2e,0xf1); // how much time
outportb(0x2f,1); // 1 minute

I suppose only the kernel can write to the 0x2e and 0x2f address ?

Usually watchdog are detected and shown in dmesg, have i to ask more
detail to the supplier ?

Looks like a semi-standard "motherboard plug and play" SuperIO unlock
sequence to me. If you can find out what chip they use on the
motherboard, you might be able to find a data sheet for it.

Referenced from:https://marc.info/?l=openbsd-tech&m=141821845915978&w=2

代码如下:

/* Example code that uploads a file name 'foo' to a remote script that accepts
 * "HTML form based" (as described in RFC1738) uploads using HTTP POST.
 *
 * The imaginary form we will fill in looks like:
 *
 * <form method="post" enctype="multipart/form-data" action="examplepost.cgi">
 * Enter file: <input type="file" name="sendfile" size="40">
 * Enter file name: <input type="text" name="filename" size="30">
 * <input type="submit" value="send" name="submit">
 * </form>
 *
 */
 
#include <stdio.h>
#include <string.h>
 
#include <curl/curl.h>
 
int main(int argc, char *argv[])
{
  CURL *curl;
  CURLcode res;
 
  curl_mime *form = NULL;
  curl_mimepart *field = NULL;
  struct curl_slist *headerlist = NULL;
  static const char buf[] = "Expect:";
 
  curl_global_init(CURL_GLOBAL_ALL);
 
  curl = curl_easy_init();
  if(curl) {
    /* Create the form */
    form = curl_mime_init(curl);
 
    /* Fill in the file upload field */
    field = curl_mime_addpart(form);
    curl_mime_name(field, "sendfile");
    curl_mime_filedata(field, "postit2.c");
 
    /* Fill in the filename field */
    field = curl_mime_addpart(form);
    curl_mime_name(field, "filename");
    curl_mime_data(field, "postit2.c", CURL_ZERO_TERMINATED);
 
    /* Fill in the submit field too, even if this is rarely needed */
    field = curl_mime_addpart(form);
    curl_mime_name(field, "submit");
    curl_mime_data(field, "send", CURL_ZERO_TERMINATED);
 
    /* initialize custom header list (stating that Expect: 100-continue is not
       wanted */
    headerlist = curl_slist_append(headerlist, buf);
    /* what URL that receives this POST */
    curl_easy_setopt(curl, CURLOPT_URL, "https://example.com/examplepost.cgi");
    if((argc == 2) && (!strcmp(argv[1], "noexpectheader")))
      /* only disable 100-continue header if explicitly requested */
      curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
    curl_easy_setopt(curl, CURLOPT_MIMEPOST, form);
 
    /* Perform the request, res will get the return code */
    res = curl_easy_perform(curl);
    /* Check for errors */
    if(res != CURLE_OK)
      fprintf(stderr, "curl_easy_perform() failed: %s\n",
              curl_easy_strerror(res));
 
    /* always cleanup */
    curl_easy_cleanup(curl);
 
    /* then cleanup the form */
    curl_mime_free(form);
    /* free slist */
    curl_slist_free_all(headerlist);
  }
  return 0;
}

一些说明,以及为什么要disable 100-continue

1)Expect: 100-continue的来龙去脉:

HTTP/1.1 协议里设计 100 (Continue) HTTP 状态码的的目的是,在客户端发送 Request Message 之前,HTTP/1.1 协议允许客户端先判定服务器是否愿意接受客户端发来的消息主体(基于 Request Headers)。

即, Client 和 Server 在Post (较大)数据之前,允许双方“握手”,如果匹配上了,Client 才开始发送(较大)数据。

这么做的原因是,如果客户端直接发送请求数据,但是服务器又将该请求拒绝的话,这种行为将带来很大的资源开销。

协议对 HTTP/1.1 clients 的要求是:

如果 client 预期等待“100-continue”的应答,那么它发的请求必须包含一个 " Expect: 100-continue" 的头域!

2)libcurl 发送大于1024字节数据时启用Expect:100-continue特性:

这也就是 Laruence 在 2011 年撰文所写的:

在使用 curl 做 POST 的时候,当要POST 的数据大于 1024 字节的时候,curl 并不会直接就发起 POST 请求,而是会分为两步:

  1. 发送一个请求,包含一个 "Expect:100-continue" 头域,询问 Server 是否愿意接收数据;
  2. 接收到 Server 返回的100-continue 应答以后,才把数据 POST 给Server;

这是 libcurl 的行为。

第一, libcurl在发送大于1024 字节的 POST 请求时采用了这种方法,但是相对的,它会引起请求延迟的加大。

第二,并不是所有的 web server 都能正确处理并应答“100-continue”,比如 lighttpd,就会返回417”Expectation Failed “,造成请求逻辑出错。

( 注1:lighttpd 1.4 版本有此严重问题,于1.5版本修复。注2:Resin 于 3.0.5 版本增加了对 Expect: 100-continue 的支持。)

3)PHP Curl-library 可以主动封禁此特性:

有人在 PHP手册::curl_setopt 下留言说:

PHP curl 遵从 libcurl 的特性。由于不是所有 web servers 都支持这个特性,所以会产生各种各样的错误。如果你遇到了,可以用下面的命令封禁"Expect"头域:

<?php
    curl_setopt($ch, CURLOPT_HTTPHEADER,array('Expect:'));
?>

spdlog 下载编译

wget https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz
tar xvf v1.9.2.tar.gz
cd spdlog-1.9.2/
cmake -DCMAKE_INSTALL_PREFIX=$(pwd)/../../install .

spdlog 简单使用

#include "spdlog/spdlog.h"
#include "spdlog/sinks/basic_file_sink.h"

int main() 
{
    spdlog::info("Welcome to spdlog!");
    spdlog::error("Some error message with arg: {}", 1);
    
    spdlog::warn("Easy padding in numbers like {:08d}", 12);
    spdlog::critical("Support for int: {0:d};  hex: {0:x};  oct: {0:o}; bin: {0:b}", 42);
    spdlog::info("Support for floats {:03.2f}", 1.23456);
    spdlog::info("Positional args are {1} {0}..", "too", "supported");
    spdlog::info("{:<30}", "left aligned");
    
    spdlog::set_level(spdlog::level::debug); // Set global log level to debug
    spdlog::debug("This message should be displayed..");    
    
    // change log pattern
    spdlog::set_pattern("[%H:%M:%S %z] [%n] [%^---%L---%$] [thread %t] %v");
    
    // Compile time log levels
    // define SPDLOG_ACTIVE_LEVEL to desired level
    SPDLOG_TRACE("Some trace message with param {}", 42);
    SPDLOG_DEBUG("Some debug message");
    
    // Set the default logger to file logger
    auto file_logger = spdlog::basic_logger_mt("basic_logger", "logs/basic.txt");
    spdlog::set_default_logger(file_logger);            
}

spdlog 常见问题

//打印行号
// 先设置日志输出格式
// %s:文件名,my_file.cpp
// %#:行号,123
// %!:函数名,my_func
spdlog::set_pattern("%Y-%m-%d %H:%M:%S [%l] [%t] - <%s>|<%#>|<%!>,%v");

// 使用宏才会有行号
SPDLOG_DEBUG("Some debug message");
spdlog::info("Welcome to spdlog!");

志输出格式具体见:https://github.com/gabime/spdlog/wiki/3.-Custom-formatting

spdlog 推荐用法

#define DEBUG(...) SPDLOG_LOGGER_DEBUG(spdlog::default_logger_raw(), __VA_ARGS__)
#define LOG(...) SPDLOG_LOGGER_INFO(spdlog::default_logger_raw(), __VA_ARGS__)
#define WARN(...) SPDLOG_LOGGER_WARN(spdlog::default_logger_raw(), __VA_ARGS__)
#define ERROR(...) SPDLOG_LOGGER_ERROR(spdlog::default_logger_raw(), __VA_ARGS__)

DEBUG("debug");
LOG("info");
WARN("warn");
ERROR("error");

spdlog 控制台输出

// 设置默认logger为控制台即可
// 设置默认logger,这里是控制台,所以spdlog::info的内容会输出到控制台
auto console = spdlog::stdout_color_mt("console");
spdlog::set_default_logger(console);

spdlog 控制台输出官方代码

#include "spdlog/spdlog.h"
#include "spdlog/sinks/stdout_color_sinks.h"
void stdout_example()
{
    // create color multi threaded logger
    auto console = spdlog::stdout_color_mt("console");    
    //auto err_logger = spdlog::stderr_color_mt("stderr");    
    spdlog::get("console")->info("loggers can be retrieved from a global registry using the spdlog::get(logger_name)");
}

spdlog 同时输出控制台和写日志文件

// spdlog 同时输出console和文件
#define DEBUG(...) SPDLOG_LOGGER_DEBUG(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_DEBUG(spdlog::get("daily_logger"), __VA_ARGS__)
#define LOG(...) SPDLOG_LOGGER_INFO(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_INFO(spdlog::get("daily_logger"), __VA_ARGS__)
#define WARN(...) SPDLOG_LOGGER_WARN(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_WARN(spdlog::get("daily_logger"), __VA_ARGS__)
#define ERROR(...) SPDLOG_LOGGER_ERROR(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_ERROR(spdlog::get("daily_logger"), __VA_ARGS__)

auto console = spdlog::stdout_color_mt("console");
spdlog::set_default_logger(console);
// 每天2:30 am 新建一个日志文件
auto logger = spdlog::daily_logger_mt("daily_logger", "logs/daily.txt", 2, 30);
// 遇到warn flush日志,防止丢失
logger->flush_on(spdlog::level::warn);

spdlog 文件按天分割

#include "spdlog/sinks/daily_file_sink.h"
void daily_example()
{
    // Create a daily logger - a new file is created every day on 2:30am
    auto logger = spdlog::daily_logger_mt("daily_logger", "logs/daily.txt", 2, 30);
}

spdlog 定时flush到文件
spdlog为了提高性能,降低对磁盘的写操作,通过flush机制来一次性把日志写入到文件里面持久化。所以如果没有恰当的配置,停止调试或者进程崩溃的时候会有日志丢失的问题。

//每三秒刷新一次
spdlog::flush_every(std::chrono::seconds(3));

遇到error级别,立即flush到文件:

enum level_enum
{
    trace = SPDLOG_LEVEL_TRACE, // 最低
    debug = SPDLOG_LEVEL_DEBUG,
    info = SPDLOG_LEVEL_INFO,
    warn = SPDLOG_LEVEL_WARN,
    err = SPDLOG_LEVEL_ERROR,
    critical = SPDLOG_LEVEL_CRITICAL, // 最高
    off = SPDLOG_LEVEL_OFF,
    n_levels
};

auto logger = spdlog::daily_logger_mt("daily_logger", "log/daily.txt", 2, 30);
// 遇到warn或者更高级别,比如err,critical 立即flush日志,防止丢失
logger->flush_on(spdlog::level::warn);

spdlog 使用完整代码

#include "spdlog/spdlog.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "spdlog/sinks/daily_file_sink.h"
#include "spdlog/sinks/stdout_color_sinks.h"
#include <iostream>
#include <memory>

// spd 带行号的打印,同时输出console和文件
#define DEBUG(...) SPDLOG_LOGGER_DEBUG(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_DEBUG(spdlog::get("daily_logger"), __VA_ARGS__)
#define LOG(...) SPDLOG_LOGGER_INFO(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_INFO(spdlog::get("daily_logger"), __VA_ARGS__)
#define WARN(...) SPDLOG_LOGGER_WARN(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_WARN(spdlog::get("daily_logger"), __VA_ARGS__)
#define ERROR(...) SPDLOG_LOGGER_ERROR(spdlog::default_logger_raw(), __VA_ARGS__);SPDLOG_LOGGER_ERROR(spdlog::get("daily_logger"), __VA_ARGS__)

int main(int argc, char *argv[]) {
    // 按文件大小
    //auto file_logger = spdlog::rotating_logger_mt("file_log", "log/log.log", 1024 * 1024 * 100, 3);
    // 每天2:30 am 新建一个日志文件
    auto logger = spdlog::daily_logger_mt("daily_logger", "logs/daily.txt", 2, 30);
    // 遇到warn flush日志,防止丢失
    logger->flush_on(spdlog::level::warn);
    //每三秒刷新一次
    spdlog::flush_every(std::chrono::seconds(3));
    
    // Set the default logger to file logger
    auto console = spdlog::stdout_color_mt("console");
    spdlog::set_default_logger(console);
    spdlog::set_level(spdlog::level::debug); // Set global log level to debug

    // change log pattern
    // %s:文件名
    // %#:行号
    // %!:函数名
    spdlog::set_pattern("%Y-%m-%d %H:%M:%S [%l] [%t] - <%s>|<%#>|<%!>,%v");

    LOG("test info");
    ERROR("test error");
    
    // Release and close all loggers
    spdlog::drop_all();
}

spdlog 线程安全
对于sinks,以 _mt 后缀结尾的是线程安全的,比如:daily_file_sink_mt
以_st 后缀结尾的是非线程安全的,比如:daily_file_sink_st

spdlog 接口
spdlog 总体而言提供了日志接口

spdlog::debug(), 默认的日志对象,使用默认的日志信息格式,输出至 stdout。
logger->debug(), 指定日志对象进行日志记录,输出至该日志对象对应的文件中。
SPDLOG_LOGGER_DEBUG(logger), SPDLOG_DEBUG(), 使用宏对以上两种接口进行包装,产生的日志格式包含 文件、函数、行。

spdlog set_level 设置日志级别

低于设置级别的日志将不会被输出。各level排序,数值越大级别越高:

// Runtime log levels
spd::set_level(spd::level::info); //Set global log level to info
console->debug("This message should not be displayed!");
console->set_level(spd::level::debug); // Set specific logger's log level
console->debug("This message should be displayed..");

第一行日志debug级别低于设定的级别info,在level为info时不会被输出。
第二行日志debug级别与设定的级别相同,所以可以显示出来。

typedef enum
{
    trace = 0,
    debug = 1,
    info = 2,
    warn = 3,
    err = 4,
    critical = 5,
    off = 6
} level_enum;

spdlog 同步和异步设置
Asynchronous logging

官方说明:https://github.com/gabime/spdlog/wiki/6.-Asynchronous-logging

默认情况下是不开启异步模式的,开启异步模式方式如下:

size_t q_size = 4096; //queue size must be power of 2
spdlog::set_async_mode(q_size);

队列大小:

队列占用的内存 = 设置的队列大小 * slot的大小, 64位系统下slot大小为104字节。由此可根据系统的log输出总量来确定队列大小。

spdlog drop -- 释放logger

在程序结束时,应该调用drop_all() 释放所有logger。

There is a bug in VS runtime that cause the application dead lock when it exits. If you use async logging, please make sure to call spdlog::drop_all() before main() exit. If some loggers are not in the registry, those should be released manually as well. stackoverflow: std::thread join hangs if called after main exits when using vs2012 rc

// Release and close all loggers
spdlog::drop_all();

或者单独drop某个logger

spd::drop("console");
spd::drop("basic_logger");

spdlog dump_backtrace

// 日志的所有信息能够被储存在一个环形缓冲里(包括 调试/追踪信息),之后可以根据需要将之打印出来。
// 需要时,调用dump_backtrace()

spdlog::enable_backtrace(32); // 保存最后的32条信息,早些时候的信息会被丢掉。
spdlog::dump_backtrace(); // 这时会打印出来,最后的32条信息。

spdlog::logger::dump_backtrace() will not work inside signal handlers.

spdlog_level environment variable

./example SPDLOG_LEVEL=info,mylogger=trace 

也可以使用

export SPDLOG_LEVEL=info
./example

#include "spdlog/cfg/env.h"
void load_levels_example()
{
    // Set the log level to "info" and mylogger to to "trace":
    // SPDLOG_LEVEL=info,mylogger=trace && ./example
    spdlog::cfg::load_env_levels();
    // or from command line:
    // ./example SPDLOG_LEVEL=info,mylogger=trace
    // #include "spdlog/cfg/argv.h" // for loading levels from argv
    // spdlog::cfg::load_argv_levels(args, argv);
}

zmq介绍
ØMQ (也拼写作ZeroMQ,0MQ或ZMQ)是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列, 但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字风格的API。

ZeroMQ是由iMatix公司和大量贡献者组成的社群共同开发的。 ZMQ 是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核。

libzmq 下载地址

https://github.com/zeromq/libzmq/archive/refs/tags/v4.3.4.tar.gz

zmq github地址

https://github.com/zeromq/libzmq/

cppzmq github地址

https://github.com/zeromq/cppzmq

cppzmq 下载地址

https://github.com/zeromq/cppzmq/archive/refs/tags/v4.8.1.tar.gz

cppzmq 介绍

cppzmq is a C++ binding for libzmq. It has the following design goals:

cppzmq maps the libzmq C API to C++ concepts. In particular:
it is type-safe (the libzmq C API exposes various class-like concepts as void*)
it provides exception-based error handling (the libzmq C API provides errno-based error handling)
it provides RAII-style classes that automate resource management (the libzmq C API requires the user to take care to free resources explicitly)
cppzmq is a light-weight, header-only binding. You only need to include the header file zmq.hpp (and maybe zmq_addon.hpp) to use it.
zmq.hpp is meant to contain direct mappings of the abstractions provided by the libzmq C API, while zmq_addon.hpp provides additional higher-level abstractions.

ubuntu 下编译 libzmq

wget https://github.com/zeromq/libzmq/archive/refs/tags/v4.3.4.tar.gz
tar xvf v4.3.4.tar.gz
cd libzmq-4.3.4/
./autogen.sh
mkdir build && cd build
cmake ..
make

cmake的时候提示没有libsodium,安装就好。

sudo apt install libsodium-dev

指定安装路径

cmake -DCMAKE_INSTALL_PREFIX=$(pwd)/../../install.x64 ..

make install 即可。

ubuntu 下编译cppzmq

wget https://github.com/zeromq/cppzmq/archive/refs/tags/v4.8.1.tar.gz
tar xvf v4.8.1.tar.gz
cd cppzmq-4.8.1/
mkdir build && cd build
cmake -DCMAKE_INSTALL_PREFIX=$(pwd)/../../install.x64 ..
make install

Zmq通信场景

  • 线程之间(inproc)
  • 进程之间(ipc)
  • 机器之间(tcp)

Zmq通信模式

  • 请求-回复(Request-reply)。分为ZMQ_REQ、ZMQ_REP、ZMQ_DEALER、ZMQ_ROUTER
    REQ-REP模式是阻塞式的,也就是说必须要client先发送一条消息给server,然后server才可以返回一个response给client。任何顺序上的错误都会导致报错。

a) 服务端和客户端无论谁先启动,效果是相同的,这点不同于Socket。

b) 在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。

c) 服务端收到信息以后,会send一个“World”给客户端。值得注意的是一定是client连接上来以后,send消息给Server,然后Server再rev然后响应client,这种一问一答式的。如果Server先send,client先rev是会报错的。

d) ZMQ通信通信单元是消息,他除了知道Bytes的大小,他并不关心的消息格式。因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等等。

  • 发布-订阅(Publish-subscribe)。分为ZMQ_PUB、ZMQ_SUB
    发布者只是绑定了端口,并进行信息发布,其并不care是否有接收者,有哪些接收者。

要说明的两点就是:

  1. 服务器端一直不断的广播中,如果中途有 Subscriber 端退出,并不影响他继续的广播,当 Subscriber 再连接上来的时候,收到的就是后来发送的新的信息了。这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题,所谓的 Slow joiner。

注意这个slow joiner问题,之后会为了解决这个问题而设计新的模式。

2.但是,如果 Publisher 中途离开,所有的 Subscriber 会 hold 住,等待 Publisher 再上线的时候,会继续接受信息。

  • 管道(Pipeline)。分为ZMQ_PUSH、ZMQ_PULL
    管道模式(Pipeline) 这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine 比较适合于这种场景。Pipeline的原理就是:有一个Publisher来发布任务,这些任务是可以平行执行的。有一批Worker用于接收任务,Worker处理完任务之后就将结果发送到Sink之中用于归总或进一步处理。

所以要明确的是Pipeline之中并不是服务器,客户端的关系了,而是有三种对象——Ventilator,Worker,Sink
task ventilator 使用的是bind SOCKET_PUSH,将任务分发到 Worker 节点上。而 Worker 节点上,使用 connect SOCKET_PULL 从上游接受任务,并使用connect SOCKET_PUSH 将结果汇集到bind Slink。

  • 对立对(Exclusive pair)。分为ZMQ_PAIR

libzmq 交叉编译

./configure --host=arm-linux-gnueabihf --without-libsodium --prefix=$(pwd)/../install.arm --disable-libbsd

zmq_proxy_steerable 说明

int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);

第四个参数“control”。如果控制socket不为NULL,这个代理支持后续的控制操作。如果这个socket接收到SUSPEND0消息,此代理将延迟它的活动。如果接到了RESUME0消息,它将继续工作。如果收到了TERMINATE0消息,它将平滑的(smoothly)结束。如果接到了STATISTICS0消息 代理将回复控制套接字,发送一个8帧的多部分消息,每个消息具有64位的无符号整数,其顺序如下:前端套接字接收的消息数===>前端套接字接收的字节数===>发送给前端套接字的消息数量===>前端套接字发送的字节数===>后端套接字接收的消息数===>后端套接字接收的字节数===>发送后端套接字的消息数===>发送后端套接字的字节数。
如果控制socket为NULL,此函数和zmq_proxy的工作方式一样。
当控制socket接收到TERMINATE时zmq_proxy_steerable()函数返回0,。否则,返回 -1,并且设置errno为ETERM。
创建一个共享的代理队列

//  Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_SUB);
assert (control);
//  Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
// Subscribe to the control socket since we have chosen SUB here
assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
//  Start the queue proxy, which runs until ETERM or "TERMINATE" received on the control socket zmq_proxy (frontend, backend, NULL, control);

在另一个节点上创建一个控制器,进程或者其它

void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
assert (zmq_bind (control, "tcp://*:5557") == 0);
// stop the proxy 
assert (zmq_send (control, "STOP", 5, 0) == 0);
// resume the proxy
assert (zmq_send (control, "RESUME", 7, 0) == 0);
// terminate the proxy
assert (zmq_send (control, "TERMINATE", 10, 0) == 0);

zmq router dealer 介绍

“请求-响应”代理使用ZMQ_ROUTER、ZMQ_DEALER
ZMQ_DEALER
ZMQ_DEALER类型的套接字是用于扩展“请求/应答”套接字的高级模式
发送消息时:当ZMQ_DEALER套接字由于已达到所有对等点的最高水位而进入静音状态时,或者如果根本没有任何对等点,则套接字上的任何zmq_send()操作都应阻塞,直到静音状态结束或至少一个对等方变得可以发送;消息不会被丢弃
接收消息时:发送的每条消息都是在所有连接的对等方之间进行轮询,并且收到的每条消息都是从所有连接的对等方进行公平排队的
将ZMQ_DEALER套接字连接到ZMQ_REP套接字时,发送的每个消息都必须包含一个空的消息部分,定界符以及一个或多个主体部分
REQ和ROUTER交流,DEALER与REP交流。

我们的代理必须是非阻塞的,可以使用zmq_poll()来轮询任何一个套接字上的活动,但我们不能使用REQ-REQ。幸运地是,有两个称为DEALER和ROUTER的套接字,它们能我们可以执行无阻塞的请求-响应

客户端的流程如下:将REQ套接字连接到代理的ROUTER节点上,向ROUTER节点发送“Hello”,接收到“World”的回复
服务端的流程如下:将REP套接字连接到代理的DEALER节点上
代理端的流程如下:
创建一个ROUTER套接字与客户端相连接,创建一个DEALER套接字与服务端相连接
ROUTER套接字从客户端接收请求数据,并把请求数据发送给服务端
DEALER套接字从服务端接收响应数据,并把响应数据发送给客户端
客户端的消息是有顺序到达客户端的,消息会自动进行排队

ZMQ_ROUTER 兼容的对等套接字 ZMQ_DEALER、ZMQ_REQ、ZMQ_ROUTER
ZMQ_DEALER 兼容的对等套接字 ZMQ_ROUTER、ZMQ_REP、ZMQ_DEALER

“请求-响应模型”支持的套接字类型有4种:
ZMQ_REP
ZMQ_REQ
ZMQ_DEALER
ZMQ_ROUTER

php zmq

安装 sudo apt install php-zm

php zmq 示例

<?php

/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_PUB);
$queue->connect("tcp://127.0.0.1:5554");

while (1) {
  $queue->send("hello there!");
}

php zmq req示例

<?php

/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, "MySock1");

/* Connect to an endpoint */
$queue->connect("tcp://127.0.0.1:5555");

/* send and receive */
var_dump($queue->send("hello there, using socket 1")->recv());

?>

php zmq req 非阻塞示例

<?php

/* Create new queue object */
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ, "MySock1");
$queue->connect("tcp://127.0.0.1:5555");

/* Assign socket 1 to the queue, send and receive */
$retries = 5;
$sending = true;
$sent    = false;

$queue->setSockOpt (ZMQ::SOCKOPT_LINGER, 1000);

echo "Trying to send message\n";
do {
    try {
        if ($sending) {
            if ($queue->send("This is a message", ZMQ::MODE_DONTWAIT) !== false) {
                echo "Message sent\n";
                $sent    = true;
                $sending = false;
            }
        }
    } catch (ZMQSocketException $e) {
        die(" - Error: " . $e->getMessage());
    }
    usleep (1000);
} while (1 && --$retries);

$retries = 2;
$receiving = true;
$received  = false;

echo "Trying to receive message\n";
do {
    try {
        if ($receiving) {
            $message = $queue->recv (ZMQ::MODE_DONTWAIT);
            
            if ($message) {
                echo "Received message: " . $message . "\n";
                $receiving = false;
                $received = true;
            }
        }
    } catch (ZMQSocketException $e) {
        die(" - Error: " . $e->getMessage());
    }
    sleep (1);
} while (1 && --$retries);

if (!$received) {
    echo "The receive timed out\n";
}

?>

更多php-zmq示例
https://github.com/zeromq/php-zmq/tree/master/examples

ZeroMQ设置超时等待

设置一下超时等待,避免阻塞。

使用方法:

int timeout = 1000;
zmq_setsockopt (requester, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
zmq_setsockopt (requester, ZMQ_SNDTIMEO, &timeout, sizeof(timeout));

php zmq pipe line 示例
https://www.cnblogs.com/jkko123/p/6294575.html
pipeline中,send.php通过把累加任务分发给50个worker节点计算,然后worker节点计算完成后,把结果发送给result.php进行统一的汇总。

sender:

$sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sed->bind("tcp://127.0.0.1:8881");

worker:

$rev = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$rev->connect("tcp://127.0.0.1:8881"); 
$sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sed->connect("tcp://127.0.0.1:8882");

result:

$rev = new ZMQSocket($context, ZMQ::SOCKET_PULL); 
$rev->bind("tcp://127.0.0.1:8882");

C/C++使用ZeroMQ的Router/Dealer模式搭建高性能后台服务框架
ROUTER/DEALER的优点
没错,就是简单使用ZeroMQ提供的ROUTER/DEALER组合模式,可以轻松搭建一个高性能异步的C/C++后台服务框架。ROUTER可 以高效的接收客户端的请求,而DEALER可以负载均衡的调度后端Worker工作。当客户端的请求特别多,后端Worker处理不过来,需要增加 Worker的时候,也非常简单,新加入的Worker直接Connect到DEALER即可。如此运维起来也非常高效,后端可以非常简单的横向扩展!值的一提的是,ROUTER又叫做XREP,DEALER又叫做XREQ。

Dealer将后端Worker的应答数据转发到Router。
然后由Router寻址将应答数据准确的传递给对应的client。
值得注意的是,Router对client的寻址方式,得看client的‘身份’。
临时身份的client,Router会为其生成一个uuid进行标识。
永久身份的client,Router直接使用该client的身份。
ZMQ_REP套接字,其实就是一个“应答”,即,把应答数据回复给ZMQ_REQ,他们是严格的一问一答的方式。不过组合上 ZMQ_ROUTER,ZMQ_DEALER模式后,后台Worker不再是服务的死穴,可以通过横向扩展多个Worker来提高处理ZMQ_REQ的能 力。

ZMQ_REQ是“问”的套接字,它需要“答”套接字(ZMQ_REP)。不过目前ZMQ_REP已经被 ROUTER/DEALER华丽的包装成高富帅了。ZMQ_REQ只能通过联系ROUTER,然后由这个ROUTER/DEALER组成的DEVICE帮 忙传递“爱意”到达ZMQ_REP了,然后再默默的期待ROUTER传递ZMQ_REQ的“答复”。

ZeroMQ的ROUTER/DEALER模式

1、客户端(ZMQ_REQ)发送请求到ROUTER后,ROUTER是会对客户端进行身份表示的,正式因为有这个身份标示,所以ROUTER才有能力正确的把应答数据准确的传递到来源的客户端。

现在可以回答一下上文的一个思考题了—-ROUTER传递的3帧数据到底是什么数据:

A、第一帧是ROUTER自己加上的消息,这个是ROUTER对ZMQ_REQ所做的一个身份标识。说到身份标识,这里就引入到两种套接字。

一种叫做临时套接字,另外一种叫做永久套接字,他们的区别仅仅是是否使用ZMQ_IDENTITY。

没使用的即默认为临时套接字,我的这个文章里面的例子就是一个临时套接字。对于临时套接字,ROUTER会为它生成一个唯一的UUID,所以可以看到第一帧的长度为5,正是这个UUID。

而使用如下方式设定的套接字,则称为永久套接字。如果这样设置,那第一帧收到的消息长度就是13,而ROUTER也会直接使用www.example.com这个身份来标识这个客户端。

zmq_setsockopt(req, ZMQ_IDENTITY, “www.example.com”, 13);

B、第二帧是一个空帧,这是由REQ加上去的。可以理解为一个分隔符,当ROUTER遇到这个空帧后,就知道下一帧就是真正的请求数据了,这在多种组合模型里面尤其有用。

C、第三帧显然就是真正的请求数据了。这里的例子比较简单,复杂的例子,客户端可能会通过ZMQ_SNDMORE来发送多帧数据。如果是这样,那ROUTER还会继续收到第四帧,第五帧。。。数据的。

2、REQ到达ROUTER,ROUTER会公平排队,并公平的将REQ发往后端。但当后端能力不足的时候,导致ROUTER派发太慢的时候,ROUTER进入高水位的时候,ROUTER会丢弃掉消息的。所以这个得注意监控后台服务的性能。

3、DEALER会负载均衡的将任务派发后连接到它的各个工作Worker。
https://hottaro.com/index.php?document_srl=256&mid=Framework