当前位置:网站首页>Zeromq from getting started to mastering
Zeromq from getting started to mastering
2022-06-26 04:05:00 【Dreamers on the road】
One 、ZeroMQ sketch
ZeroMQ Is a multi-threaded network library based on message queue , For socket type 、 Connection processing 、 frame 、 Even the underlying details of routing are abstracted , Provides sockets that span multiple transport protocols . There are three common communication modes :
Request response model
The requester initiates the request , And wait for the responder to respond to the request . From the point of view of the requester, it must be a pair of transceiver pairs ; conversely , At the response end, it must be the sender receiver pair . Both the requestor and the responder can be 1:N Model of . Usually put 1 Think it's server ,N Think it's Client .ZeroMQ It can support routing function very well ( The component that implements the routing function is called Device), hold 1:N Expand to N:M ( Only a few routing nodes need to be added ). From this model , The bottom endpoint address is hidden from the top . Each request contains an implicit response address , And apps don't care about it .
Publish and subscribe model
In this model , The publisher only sends data in one direction , And don't care whether all the information is sent to the subscriber . If the publisher starts publishing information , The subscriber is not connected yet , This information is discarded directly . But once the subscriber is connected , There will be no loss of information . Again , The subscriber is only responsible for receiving , Without feedback . If the publisher and subscriber need to interact ( For example, confirm whether the subscriber is connected ), Use additional socket Use the request response model to meet this requirement .
Pipe model
In this model , The pipe is one-way , from PUSH The end is unidirectional PULL One way push data flow at the end .
Two 、 Process steps
1、 Request response model
Server implementation :
(1)zmq_ctx_new()
// return ctx_t object
void *zmq_ctx_new (void)
{
// We do this before the ctx constructor since its embedded mailbox_t
// object needs the network to be up and running (at least on Windows).
if (!zmq::initialize_network ()) {
return NULL;
}
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
if (ctx) {
if (!ctx->valid ()) {
delete ctx;
return NULL;
}
}
return ctx;
}
The function returns context( Context ), It's actually calling theta ctx_t object ( Instantiation ),
zmq::ctx_t::ctx_t () :
_tag (ZMQ_CTX_TAG_VALUE_GOOD),
_starting (true),
_terminating (false),
_reaper (NULL),
_max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
_max_msgsz (INT_MAX),
_io_thread_count (ZMQ_IO_THREADS_DFLT),
_blocky (true),
_ipv6 (false),
_zero_copy (true)
{
#ifdef HAVE_FORK
_pid = getpid ();
#endif
#ifdef ZMQ_HAVE_VMCI
_vmci_fd = -1;
_vmci_family = -1;
#endif
// Initialise crypto library, if needed.
zmq::random_open ();
#ifdef ZMQ_USE_NSS
NSS_NoDB_Init (NULL);
#endif
#ifdef ZMQ_USE_GNUTLS
gnutls_global_init ();
#endif
}
It mainly sets initialization parameters ; Such as ;_max_sockets = 1024;_io_thread_count = 1; Of course, there are some status settings and so on ;
(2)zmq_socket()
void *zmq_socket (void *ctx_, int type_)
{
// The object is NULL, Then return to
if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
errno = EFAULT;
return NULL;
}
// Strong go
zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
zmq::socket_base_t *s = ctx->create_socket (type_);
return (void *) s;
}
Parameters :
void *ctx_;zmq_ctx_new Context parameter returned ;
int type_:Socket types.
Socket types:
ZMQ_PAIR 0
ZMQ_PUB 1
ZMQ_SUB 2
ZMQ_REQ 3
ZMQ_REP 4
ZMQ_DEALER 5
ZMQ_ROUTER 6
ZMQ_PULL 7
ZMQ_PUSH 8
ZMQ_XPUB 9
ZMQ_XSUB 10
ZMQ_STREAM 11
Return value : Create the generated socket, call create_socket().
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
scoped_lock_t locker (_slot_sync);
// Initialize mailbox array , Add two slots (slots),
//zmq_ctx_term thread and reaper thread
if (unlikely (_starting)) {
if (!start ())
return NULL;
}
// Once zmq_ctx_term() was called, we can't create new sockets.
if (_terminating) {
errno = ETERM;
return NULL;
}
// If max_sockets limit was reached, return error.
if (_empty_slots.empty ()) {
errno = EMFILE;
return NULL;
}
// Choose a slot for the socket.
uint32_t slot = _empty_slots.back ();
_empty_slots.pop_back ();
// Generate new unique socket ID.
// Generate a new unique socket ID.
int sid = (static_cast<int> (max_socket_id.add (1))) + 1;
// Create the socket and register its mailbox.
// establish socket And register email
socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
if (!s) {
_empty_slots.push_back (slot);
return NULL;
}
_sockets.push_back (s);
_slots[slot] = s->get_mailbox ();
return s;
}
call start () function :
bool zmq::ctx_t::start ()
{
// Initialise the array of mailboxes. Additional two slots are for
// zmq_ctx_term thread and reaper thread.
_opt_sync.lock ();
const int term_and_reaper_threads_count = 2;
const int mazmq = _max_sockets; //1023
const int ios = _io_thread_count;//1
_opt_sync.unlock ();
int slot_count = mazmq + ios + term_and_reaper_threads_count;//1026
try {
// Add capacity , Don't create objects
_slots.reserve (slot_count);
_empty_slots.reserve (slot_count - term_and_reaper_threads_count);
}
catch (const std::bad_alloc &) {
errno = ENOMEM;
return false;
}
// Changed the size of the container , And created the objects in the container
_slots.resize (term_and_reaper_threads_count);
// Initialise the infrastructure for zmq_ctx_term thread.
_slots[term_tid] = &_term_mailbox;
// Create thread
_reaper = new (std::nothrow) reaper_t (this, reaper_tid);
if (!_reaper) {
errno = ENOMEM;
goto fail_cleanup_slots;
}
if (!_reaper->get_mailbox ()->valid ())
goto fail_cleanup_reaper;
_slots[reaper_tid] = _reaper->get_mailbox ();
_reaper->start ();
// Create I/O thread objects and launch them.
_slots.resize (slot_count, NULL);
// establish IO Thread and start
for (int i = term_and_reaper_threads_count;
i != ios + term_and_reaper_threads_count; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
if (!io_thread) {
errno = ENOMEM;
goto fail_cleanup_reaper;
}
if (!io_thread->get_mailbox ()->valid ()) {
delete io_thread;
goto fail_cleanup_reaper;
}
_io_threads.push_back (io_thread);
_slots[i] = io_thread->get_mailbox ();
io_thread->start ();
}
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
_empty_slots.push_back (i);
}
_starting = false;
return true;
fail_cleanup_reaper:
_reaper->stop ();
delete _reaper;
_reaper = NULL;
fail_cleanup_slots:
_slots.clear ();
return false;
}
(3)zmq_bind()
(4)zmq_recv()
(5)zmq_send()
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
int main(int argc, char* argv[])
{
void* context=zmq_ctx_new();
void* socket=zmq_socket(context,ZMQ_REP);
zmq_bind(socket,"tcp://192.168.207.129:9999");
while(true)
{
char buf[10];
int bytes=zmq_recv(socket,buf,10,0);
buf[bytes]='\0';
printf("[Server] Received Request Message:%d bytes,content:\"%s\"\n",bytes,buf);
sleep(1);
const char* replyMsg="World";
bytes=zmq_send(socket,replyMsg,strlen(replyMsg),0);
printf("[Server] Sended Reply Message:%d bytes,content:\"%s\"\n",bytes,replyMsg);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
Client implementation :
(1)zmq_ctx_new()
(2)zmq_socket()
(3)zmq_bind()
(4)zmq_send()
(5)zmq_recv()
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>
int main(int argc, char* argv[])
{
printf("Connect to server...\n");
void* context=zmq_ctx_new();
void* socket=zmq_socket(context,ZMQ_REQ);
zmq_connect(socket,"tcp://192.168.207.129:9999");
int k=0;
while(true)
{
char buf[10];
const char* requestMsg="Hello";
int bytes=zmq_send(socket,requestMsg,strlen(requestMsg),0);
printf("[Client] [%d] Send request Message: %d bytes,content:\"%s\"\n",k,bytes,requestMsg);
bytes=zmq_recv(socket,buf,10,0);
buf[bytes]='\0';
printf("[Client] [%d] Received Reply Message: %d bytes,content:\"%s\"\n",k,bytes,buf);
k++;
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
MakeFile
all:client server
client:client.cpp
g++ -std=c++11 client.cpp -o client -lzmq -lpthread -g
server:server.cpp
g++ -std=c++11 server.cpp -o server -lzmq -lpthread -g
clean:
rm -f server client
make Is a run operation ,make clean eliminate server,client
2、 subscribe - Release pattern :
ZeroMQ The subscription publication mode of is a one-way data publication , After the client subscribes to the server , The server will continuously push the generated messages to the subscribers .
characteristic :
- A publisher , Multiple subscribers , namely 1:n;
- Publish data when publisher data changes , All subscribers can receive data and process it . This is the release / A subscription model .
- Be careful : Use SUB When setting up a subscription , You have to use zmq_setsockopt( ) Filter messages ;
Publisher use PUB Socket sends message to queue , Subscribers use SUB Sockets flow from the queue iesho9u news . New subscribers can join at any time , But previous messages cannot be received ; Existing subscribers can exit at any time ; Subscribers can also add “ filter ” Used to selectively receive information .
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
int main(int arg,char* argv[])
{
void* context=zmq_ctx_new();
assert(context!=NULL);
void* socket=zmq_socket(context,ZMQ_PUB);//socket Publisher mode
assert(socket!=NULL);
int ret=zmq_bind(socket,"tcp://192.168.207.129:9999");
assert(ret==0);
int k=0;
while(true)
{
char buf[1024];
memset(buf,0,sizeof(buf));
snprintf(buf,sizeof(buf),"server i=%d",k);
ret=zmq_send(socket,buf,strlen(buf)+1,0);
k++;
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <thread>
#define TRUE 1
void Recv(void* arg)
{
while(TRUE)
{
void* socket=arg;
printf("into while\n");
char buf[1024];
memset(buf,0,sizeof(buf));
int ret=zmq_recv(socket,buf,sizeof(buf)-1,0);
if(ret>0)
{
printf("Recv:%s\n",buf);
}
}
}
int main(int argc, char** argv)
{
printf("Hello,world!\n");
void* context=zmq_ctx_new();
assert(context!=NULL);
void* socket=zmq_socket(context,ZMQ_SUB);// Subscriber pattern
assert(socket!=NULL);
int ret=zmq_connect(socket,"tcp://192.168.207.129:9999");
assert(ret==0);
ret=zmq_setsockopt(socket,ZMQ_SUBSCRIBE,"",0);
assert(ret==0);
std::thread t1(Recv,socket);
std::thread t2(Recv,socket);
t1.join();
t2.join();
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
makefile:
all:pub sub
CXX=g++
CXXFLAGS=-fPIC -std=c++11 -o
LDFLAGS=-lzmq -lpthread
pub:pub.cpp
$(CXX) pub.cpp $(CXXFLAGS) pub $(LDFLAGS)
sub:sub.cpp
$(CXX) sub.cpp $(CXXFLAGS) sub $(LDFLAGS)
clean:
rm -f sub pub
3、 Push pull mode
Push pull mode ,PUSH send out ,send.PULL Party A receives ,recv.PUSH Can and multiple PULL Establishing a connection ,PUSH The data sent is sent to in sequence PULL Fang . Like you PUSH And three PULL Establishing a connection , Namely A,B,C.PUSH The first data sent will be sent to A, The second data will be given to B, The third data is for C, The fourth data to A. It's been a cycle .
- At the top is the task generation Distributor ventilator
- In the middle is the executor worker
- Here are the recipients of the collected results sink
Distributor ventilator
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>
int main(int argc,char **argv)
{
void * context=zmq_ctx_new();
void * sender=zmq_socket(context, ZMQ_PUSH);
zmq_bind(sender, "tcp://*:6666");
printf("Press Enter when the workers are ready: ");
getchar();
printf("Sending tasks to workers...\n");
while(true)
{
const char * replyMsg="World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
}
zmq_close(sender);
zmq_ctx_destroy(context);
return 0;
}
practitioners worker
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
int main(int argc, char **argv)
{
void * context=zmq_ctx_new();
void * recviver=zmq_socket(context, ZMQ_PULL);
zmq_connect(recviver, "tcp://localhost:6666");
void * sender=zmq_socket(context, ZMQ_PUSH);
zmq_connect(sender, "tcp://localhost:5555");
while(1)
{
char buffer [256];
int size=zmq_recv (recviver, buffer, 255, 0);
if(size < 0)
{
break;
}
printf("buffer:%s\n",buffer);
const char * replyMsg="World";
zmq_send(sender, replyMsg, strlen(replyMsg), 0);
printf("[Server] Sended Reply Message content=\"%s\"\n", replyMsg);
}
zmq_close(recviver);
zmq_close(sender);
zmq_ctx_destroy(context);
return 0;
}
The receiver sink
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
int main(int argc, char **argv)
{
void * context=zmq_ctx_new();
void * socket=zmq_socket(context,ZMQ_PULL);
zmq_bind(socket, "tcp://*:5555");
while(true)
{
char buffer [256];
int size=zmq_recv(socket,buffer,255,0);
if(size<0)
{
break;
}
printf("buffer:%s\n",buffer);
}
zmq_close(socket);
zmq_ctx_destroy(context);
return 0;
}
边栏推荐
- Tencent Interviewer: How did binder get its system services?
- 捕获数据包(Wireshark)
- Li Kou 79 word search
- Introduction of mybatis invalid
- Quanergy欢迎Lori Sundberg出任首席人力资源官
- User control custom DependencyProperty
- In the matter of getting customers at sea, how can advertisers play besides digital advertising?
- After four years of outsourcing, people are directly abandoned...
- Alibaba cloud function computing service one click to build Z-blog personal blog
- ipvs之ipvs0网卡
猜你喜欢
[Flink] Flink source code analysis - creation of jobgraph in batch mode
Open source! Vitae model brushes the world's first again: the new coco human posture estimation model achieves the highest accuracy of 81.1ap
Use soapUI to access the corresponding ESB project
【QT】对话框dialog
What preparation should I make before learning SCM?
chrome页面录制,重放功能
How to solve the problem that iterative semi supervised training is difficult to implement in ASR training? RTC dev Meetup
[collection of good books] from technology to products
力扣 515. 在每个树行中找最大值
Spark - understand parquet
随机推荐
R language and machine learning
What preparation should I make before learning SCM?
Webrtc series - 6-connections tailoring for network transmission
matplotlib多条折线图,点散图
钉钉开放平台-小程序开发实战(钉钉小程序客户端)
After four years of outsourcing, people are directly abandoned...
What if the serial port fails to open when the SCM uses stc-isp to download software?
Using jsup to extract images from interfaces
Use soapUI to access the corresponding ESB project
使用Jsoup提取接口中的图片
Li Kou 79 word search
Go time package: second, millisecond, nanosecond timestamp output
线程同步之读写锁
Parse JSON interface and insert it into the database in batch
Threejs专用天空盒素材,五种天空盒素材免费下载
win10 系统打开的软件太小,如何变大(亲测有效)
Machine learning notes - trend components of time series
Ieda suddenly cannot find compact middle packages
R语言与机器学习
Matplotlib multi line chart, dot scatter chart