nanomsg下载编译使用方式基本概念PipelineRequest/ReplyPairPub/SubSurveyBus项目中使用
发布服务订阅服务makefile
nanomsg下载编译下载地址:https://github.com/nanomsg/nanomsg/releases
unzip nanomsg-1.1.5.zip cd nanomsg-1.1.5/ mkdir build cd build cmake .. cmake --build . ctest . sudo cmake --build . --target install sudo ldconfig使用方式
Pipeline (A One-Way Pipe)管道(单向管道)
Request/Reply (I ask, you answer)请求/回复(我问,你答)
Pair (Two Way Radio)配对(双向无线电)
Pub/Sub (Topics & Broadcast)发布/订阅(主题和广播)
Survey (Everybody Votes)调查(所有人投票)
Bus (Routing) 总线(路由)
使用方式示例:https://nanomsg.org/gettingstarted/pipeline.html
接口文档:https://nanomsg.org/v1.0.0/nanomsg.7.html
基本概念关于url
描述:由以下两部分组成:transport://address
- 传输指定要使用的底层传输协议
- 地址部分的含义是特定于底层传输协议的
底层传输协议:
ipc UNIX域套接字
- 在posix兼容的系统上,使用UNIX域套接字,IPC地址是文件引用*.socket文件
- 在Windows上,IPC使用命名管道。 IPC地址是不区分大小写的任意字符串,包含除反斜杠以外的任何字符。
地址示例:
- 使用相对(ipc://test.ipc) 此时在程序当前路径下生成socket文件,即域套接字文件
- 使用绝对(ipc:///tmp/test.ipc) 此时在/tmp目录下生成socket文件,即域套接字文件
tcp tcp://interface:port
port 端口
interface
本地网络接口的IPv4地址,数字形式(192.168.0.111)
本地网络接口的IPv6地址,数字形式(::1)
星号(*)表示所有本地网络接口
地址示例
tcp://*:5555
tcp://192.168.0.111:5555
ws ws://interface:port
port 端口
interface
PipelinePipeline (A One-Way Pipe)管道(单向管道)
此模式对于解决生产者/消费者问题(包括负载平衡)非常有用。消息从推侧流向拉侧。如果连接了多个对等点,则模式将尝试公平分布。
示例代码:pipeline.c
#include#include #include #include #include #include #define NODE0 "node0" #define NODE1 "node1" void fatal(const char *func) { fprintf(stderr, "%s: %sn", func, nn_strerror(nn_errno())); exit(1); } int node0(const char *url) { int sock; int rv; printf("url: "%s"n", url); if ((sock = nn_socket(AF_SP, NN_PULL)) < 0) { fatal("nn_socket"); } if ((rv = nn_bind(sock, url)) < 0) { fatal("nn_bind"); } for (;;) { char *buf = NULL; int bytes; if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0) { fatal("nn_recv"); } printf("NODE0: RECEIVED "%s"n", buf); nn_freemsg(buf); } } int node1(const char *url, const char *msg) { int sz_msg = strlen(msg) + 1; // ' ' too int sock; int rv; int bytes; if ((sock = nn_socket(AF_SP, NN_PUSH)) < 0) { fatal("nn_socket"); } if ((rv = nn_connect(sock, url)) < 0) { fatal("nn_connect"); } printf("NODE1: SENDING "%s"n", msg); if ((bytes = nn_send(sock, msg, sz_msg, 0)) < 0) { fatal("nn_send"); } sleep(1); // wait for messages to flush before shutting down return (nn_shutdown(sock, 0)); } int main(const int argc, const char **argv) { if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0)) return (node0(argv[2])); if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0)) return (node1(argv[2], argv[3])); fprintf(stderr, "Usage: pipeline %s|%s ...'n", NODE0, NODE1); return (1); }
编译
* - gcc pipeline.c -lnanomsg -o pipeline
* 运行
* - ./pipeline node0 ipc:///tmp/pipeline.ipc & node0=$! && sleep 1
* - ./pipeline node1 ipc:///tmp/pipeline.ipc “Hello, World!”
Request/ReplyRequest/Reply (我问, 你答)
请求/应答用于同步通信,其中每个问题都用一个答案进行响应,例如远程过程调用(rpc)。 与Pipeline一样,它也可以执行负载平衡。 这是套件中唯一可靠的消息传递模式,因为如果请求与响应不匹配,它将自动重试。
示例demo:reqprep.c
#include#include #include #include #include #include #define NODE0 "node0" #define NODE1 "node1" #define DATE "DATE" void fatal(const char *func) { fprintf(stderr, "%s: %sn", func, nn_strerror(nn_errno())); exit(1); } char * date(void) { time_t now = time(&now); struct tm *info = localtime(&now); char *text = asctime(info); text[strlen(text)-1] = ' '; // remove 'n' return (text); } int node0(const char *url) { int sz_date = strlen(DATE) + 1; // ' ' too int sock; int rv; if ((sock = nn_socket(AF_SP, NN_REP)) < 0) { fatal("nn_socket"); } if ((rv = nn_bind(sock, url)) < 0) { fatal("nn_bind"); } for (;;) { char *buf = NULL; int bytes; if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0) { fatal("nn_recv"); } if ((bytes == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) { printf("NODE0: RECEIVED DATE REQUESTn"); char *d = date(); int sz_d = strlen(d) + 1; // ' ' too printf("NODE0: SENDING DATE %sn", d); if ((bytes = nn_send(sock, d, sz_d, 0)) < 0) { fatal("nn_send"); } } nn_freemsg(buf); } } int node1(const char *url) { int sz_date = strlen(DATE) + 1; // ' ' too char *buf = NULL; int bytes = -1; int sock; int rv; if ((sock = nn_socket(AF_SP, NN_REQ)) < 0) { fatal("nn_socket"); } if ((rv = nn_connect (sock, url)) < 0) { fatal("nn_connect"); } printf("NODE1: SENDING DATE REQUEST %sn", DATE); if ((bytes = nn_send(sock, DATE, sz_date, 0)) < 0) { fatal("nn_send"); } if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0) { fatal("nn_recv"); } printf("NODE1: RECEIVED DATE %sn", buf); nn_freemsg(buf); return (nn_shutdown(sock, 0)); } int main(const int argc, const char **argv) { if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0)) return (node0(argv[2])); if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0)) return (node1(argv[2])); fprintf(stderr, "Usage: reqrep %s|%s ...n", NODE0, NODE1); return (1); }
编译
gcc reqrep.c -lnanomsg -o reqrep
运行
./reqrep node0 ipc:///tmp/reqrep.ipc & node0=$! && sleep 1 ./reqrep node1 ipc:///tmp/reqrep.ipc kill $node0
Output
NODE1: SENDING DATE REQUEST DATE NODE0: RECEIVED DATE REQUEST NODE0: SENDING DATE Sat Sep 7 17:39:01 2013 NODE1: RECEIVED DATE Sat Sep 7 17:39:01 2013Pair
Pair (双向通信)
当存在一对一的对等关系时,使用pair模式。 一次只能有一个对等体连接到另一个对等体,但双方都可以自由发言。
示例Demo:pair.c
#include#include #include #include #include #include #define NODE0 "node0" #define NODE1 "node1" void fatal(const char *func) { fprintf(stderr, "%s: %sn", func, nn_strerror(nn_errno())); exit(1); } int send_name(int sock, const char *name) { printf("%s: SENDING "%s"n", name, name); int sz_n = strlen(name) + 1; // ' ' too return (nn_send(sock, name, sz_n, 0)); } int recv_name(int sock, const char *name) { char *buf = NULL; int result = nn_recv(sock, &buf, NN_MSG, 0); if (result > 0) { printf("%s: RECEIVED "%s"n", name, buf); nn_freemsg(buf); } return (result); } int send_recv(int sock, const char *name) { int to = 100; if (nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) < 0) { fatal("nn_setsockopt"); } for (;;) { recv_name(sock, name); sleep(1); send_name(sock, name); } } int node0(const char *url) { int sock; if ((sock = nn_socket(AF_SP, NN_PAIR)) < 0) { fatal("nn_socket"); } if (nn_bind(sock, url) < 0) { fatal("nn_bind"); } return (send_recv(sock, NODE0)); } int node1(const char *url) { int sock; if ((sock = nn_socket(AF_SP, NN_PAIR)) < 0) { fatal("nn_socket"); } if (nn_connect(sock, url) < 0) { fatal("nn_connect"); } return (send_recv(sock, NODE1)); } int main(const int argc, const char **argv) { if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0)) return (node0(argv[2])); if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0)) return (node1(argv[2])); fprintf(stderr, "Usage: pair %s|%s ...n", NODE0, NODE1); return 1; }
编译
gcc pair.c -lnanomsg -o pair
运行
./pair node0 ipc:///tmp/pair.ipc & node0=$! ./pair node1 ipc:///tmp/pair.ipc & node1=$! sleep 3 kill $node0 $node1
Output
node0: SENDING "node0" node1: SENDING "node1" node1: RECEIVED"node0" node0: SENDING "node0" node0: RECEIVED"node1"Pub/Sub
Pub/Sub(发布/订阅)
此模式用于允许单个广播机构向多个订阅者发布消息,订阅者可以选择限制它们接收的消息。
pubsub.c
#include#include #include #include #include #include #include #define SERVER "server" #define CLIENT "client" void fatal(const char *func) { fprintf(stderr, "%s: %sn", func, nn_strerror(nn_errno())); } char * date(void) { time_t now = time(&now); struct tm *info = localtime(&now); char *text = asctime(info); text[strlen(text)-1] = ' '; // remove 'n' return (text); } int server(const char *url) { int sock; if ((sock = nn_socket(AF_SP, NN_PUB)) < 0) { fatal("nn_socket"); } if (nn_bind(sock, url) < 0) { fatal("nn_bind"); } for (;;) { char *d = date(); int sz_d = strlen(d) + 1; // ' ' too printf("SERVER: PUBLISHING DATE %sn", d); int bytes = nn_send(sock, d, sz_d, 0); if (bytes < 0) { fatal("nn_send"); } sleep(1); } } int client(const char *url, const char *name) { int sock; if ((sock = nn_socket(AF_SP, NN_SUB)) < 0) { fatal("nn_socket"); } // subscribe to everything ("" means all topics) if (nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) { fatal("nn_setsockopt"); } if (nn_connect(sock, url) < 0) { fatal("nn_connet"); } for (;;) { char *buf = NULL; int bytes = nn_recv(sock, &buf, NN_MSG, 0); if (bytes < 0) { fatal("nn_recv"); } printf("CLIENT (%s): RECEIVED %sn", name, buf); nn_freemsg(buf); } } int main(const int argc, const char **argv) { if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0)) return (server(argv[2])); if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0)) return (client (argv[2], argv[3])); fprintf(stderr, "Usage: pubsub %s|%s ...n", SERVER, CLIENT); return 1; }
Compilation
gcc pubsub.c -lnanomsg -o pubsub
Execution
./pubsub server ipc:///tmp/pubsub.ipc & server=$! && sleep 1 ./pubsub client ipc:///tmp/pubsub.ipc client0 & client0=$! ./pubsub client ipc:///tmp/pubsub.ipc client1 & client1=$!
Output
SERVER: PUBLISHING DATE Sat Sep 7 17:40:11 2013 SERVER: PUBLISHING DATE Sat Sep 7 17:40:12 2013 SERVER: PUBLISHING DATE Sat Sep 7 17:40:13 2013 CLIENT (client0): RECEIVED Sat Sep 7 17:40:13 2013 CLIENT (client1): RECEIVED Sat Sep 7 17:40:13 2013Survey
Survey (每个人投票)
投票模式用于发送一个定时的调查,在调查过期之前,将逐个返回响应。 此模式对于服务发现和投票算法非常有用。
survey.c
#include#include #include #include #include #include #include #define SERVER "server" #define CLIENT "client" #define DATE "DATE" void fatal(const char *func) { fprintf(stderr, "%s: %sn", func, nn_strerror(nn_errno())); exit(1); } char * date(void) { time_t now = time(&now); struct tm *info = localtime(&now); char *text = asctime(info); text[strlen(text)-1] = ' '; // remove 'n' return (text); } int server(const char *url) { int sock; if ((sock = nn_socket (AF_SP, NN_SURVEYOR)) < 0) { fatal("nn_socket"); } if (nn_bind(sock, url) < 0) { fatal("nn_bind"); } for (;;) { printf("SERVER: SENDING DATE SURVEY REQUESTn"); int bytes = nn_send(sock, DATE, strlen(DATE) + 1, 0); if (bytes < 0) { fatal("nn_send"); } for (;;) { char *buf = NULL; int bytes = nn_recv(sock, &buf, NN_MSG, 0); if (bytes < 0) { if (nn_errno() == ETIMEDOUT) { break; } fatal("nn_recv"); } printf("SERVER: RECEIVED "%s" SURVEY RESPONSEn", buf); nn_freemsg(buf); } printf("SERVER: SURVEY COMPLETEn"); sleep(1); // Start another survey in a second } } int client(const char *url, const char *name) { int sock; if ((sock = nn_socket(AF_SP, NN_RESPONDENT)) < 0) { fatal("nn_socket"); } if (nn_connect (sock, url) < 0) { fatal("nn_connect"); } for (;;) { char *buf = NULL; int bytes = nn_recv(sock, &buf, NN_MSG, 0); if (bytes >= 0) { printf("CLIENT (%s): RECEIVED "%s" SURVEY REQUESTn", name, buf); nn_freemsg(buf); char *d = date(); int sz_d = strlen(d) + 1; // ' ' too printf("CLIENT (%s): SENDING DATE SURVEY RESPONSEn", name); if (nn_send(sock, d, sz_d, 0) < 0) { fatal("nn_send"); } } } } int main(const int argc, const char **argv) { if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0)) return (server(argv[2])); if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0)) return (client(argv[2], argv[3])); fprintf(stderr, "Usage: survey %s|%s ...n", SERVER, CLIENT); return 1; }
Compilation
gcc survey.c -lnanomsg -o survey
Execution
./survey server ipc:///tmp/survey.ipc & server=$! ./survey client ipc:///tmp/survey.ipc client0 & client0=$! ./survey client ipc:///tmp/survey.ipc client1 & client1=$! kill $server $client0 $client1 $client2
Output
SERVER: SENDING DATE SURVEY REQUEST SERVER: SURVEY COMPLETE SERVER: SENDING DATE SURVEY REQUEST CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST CLIENT (client1): SENDING DATE SURVEY RESPonSE CLIENT (client0): SENDING DATE SURVEY RESPonSEBus
Bus (路由)
总线协议对于路由应用程序或构建完全互连的网状网络非常有用。 在这种模式中,消息被发送到每个直接连接的对等体。
bus.c
#include#include #include #include #include #include void fatal(const char *func) { fprintf(stderr, "%s: %sn", func, nn_strerror(nn_errno())); exit(1); } int node(const int argc, const char **argv) { int sock; if ((sock = nn_socket (AF_SP, NN_BUS)) < 0) { fatal("nn_socket"); } if (nn_bind(sock, argv[2]) < 0) { fatal("nn_bind"); } sleep(1); // wait for peers to bind if (argc >= 3) { for (int x = 3; x < argc; x++) { if (nn_connect(sock, argv[x]) < 0) { fatal("nn_connect"); } } } sleep(1); // wait for connections int to = 100; if (nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) < 0) { fatal("nn_setsockopt"); } // SEND int sz_n = strlen(argv[1]) + 1; // ' ' too printf("%s: SENDING '%s' onTO BUSn", argv[1], argv[1]); if (nn_send(sock, argv[1], sz_n, 0) < 0) { fatal("nn_send"); } // RECV for (;;) { char *buf = NULL; int recv = nn_recv(sock, &buf, NN_MSG, 0); if (recv >= 0) { printf("%s: RECEIVED '%s' FROM BUSn", argv[1], buf); nn_freemsg(buf); } } return (nn_shutdown(sock, 0)); } int main(int argc, const char **argv) { if (argc >= 3) { return (node(argc, argv)); } fprintf(stderr, "Usage: bus ...n"); return 1; }
Compilation
gcc bus.c -lnanomsg -o bus
Execution
./bus node0 ipc:///tmp/node0.ipc ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc & node0=$! ./bus node1 ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node1=$! ./bus node2 ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node2=$! ./bus node3 ipc:///tmp/node3.ipc ipc:///tmp/node0.ipc & node3=$! sleep 5 kill $node0 $node1 $node2 $node3
Output
node0: SENDING 'node0' onTO BUS node1: SENDING 'node1' onTO BUS node2: SENDING 'node2' onTO BUS node3: SENDING 'node3' onTO BUS node0: RECEIVED 'node1' FROM BUS node0: RECEIVED 'node2' FROM BUS node0: RECEIVED 'node3' FROM BUS node1: RECEIVED 'node0' FROM BUS node1: RECEIVED 'node2' FROM BUS node1: RECEIVED 'node3' FROM BUS node2: RECEIVED 'node0' FROM BUS node2: RECEIVED 'node1' FROM BUS node2: RECEIVED 'node3' FROM BUS node3: RECEIVED 'node0' FROM BUS node3: RECEIVED 'node1' FROM BUS node3: RECEIVED 'node2' FROM BUS项目中使用 发布服务
#include订阅服务#include #include #include #include #include #include void usage(const char *name) { fprintf(stderr, "%s [ bind url]n", name); } int main(int argc, char **argv) { if(argc != 2) { usage(argv[0]); exit(-1); } const char *url = argv[1]; int sock = nn_socket(AF_SP, NN_PUB); if(sock < 0) { fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno)); exit(-1); } if(nn_bind(sock, url) < 0) { fprintf(stderr, "nn_bind failed: %sn", nn_strerror(errno)); exit(-1); } while(1) { time_t rawtime; struct tm * timeinfo; time (&rawtime); timeinfo = localtime (&rawtime); char *text = asctime (timeinfo); int textLen = strlen(text); text[textLen - 1] = ' '; printf ("SERVER: PUBLISHING DATE %sn", text); // nn_send(sock, text, textLen, 0); char temp[128] = "DEV|Alarm|IP:192.168.128.12|Port:3721|Chan:1|$DefinitionAbnormal:On|LT:1637908405|$1637908405#1$1"; nn_send(sock, temp, strlen(temp), 0); sleep(1); } return 0; } //run: ./pubserver ipc:///tmp/test.ipc
#includemakefile#include #include #include #include int main(int argc, char **argv) { if(argc != 3) { fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]); exit(-1); } const char *name = argv[1]; const char *url = argv[2]; int sock = nn_socket (AF_SP, NN_SUB); if(sock < 0) { fprintf(stderr, "fail to create socket: %sn", nn_strerror(errno)); exit(-1); } if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "DEV|Alarm", strlen("DEV|Alarm")) < 0) { fprintf(stderr, "fail to set sorket opts: %sn", nn_strerror(errno)); exit(-1); } if (nn_connect(sock, url) < 0) { fprintf(stderr, "fail to connect to %s : %sn", url, nn_strerror(errno)); exit(-1); } printf("connect url:%s successn",url); while ( 1 ) { char *buf = NULL; int bytes = nn_recv (sock, &buf, NN_MSG, 0); printf ("CLIENT (%s): RECEIVED %sn", name, buf); nn_freemsg (buf); } nn_shutdown(sock, 0); return 0; } //run: ./pubclient deroy ipc:///tmp/test.ipc
all:pubserver pubclient pubserver:pubserver.c gcc -o pubserver pubserver.c -lnanomsg pubclient:pubclient.c gcc -o pubclient pubclient.c -lnanomsg .PHONY:clean clean: rm -f pubserver pubclient



