当前位置:网站首页>Zeromq from getting started to mastering

Zeromq from getting started to mastering

2022-06-26 04:05:00 Dreamers on the road

One 、ZeroMQ sketch

ZeroMQ Is a multi-threaded network library based on message queue , For socket type 、 Connection processing 、 frame 、 Even the underlying details of routing are abstracted , Provides sockets that span multiple transport protocols . There are three common communication modes :

Request response model

The requester initiates the request , And wait for the responder to respond to the request . From the point of view of the requester, it must be a pair of transceiver pairs ; conversely , At the response end, it must be the sender receiver pair . Both the requestor and the responder can be 1:N Model of . Usually put 1 Think it's server ,N Think it's Client .ZeroMQ It can support routing function very well ( The component that implements the routing function is called Device), hold 1:N Expand to N:M ( Only a few routing nodes need to be added ). From this model , The bottom endpoint address is hidden from the top . Each request contains an implicit response address , And apps don't care about it .

Publish and subscribe model

In this model , The publisher only sends data in one direction , And don't care whether all the information is sent to the subscriber . If the publisher starts publishing information , The subscriber is not connected yet , This information is discarded directly . But once the subscriber is connected , There will be no loss of information . Again , The subscriber is only responsible for receiving , Without feedback . If the publisher and subscriber need to interact ( For example, confirm whether the subscriber is connected ), Use additional socket Use the request response model to meet this requirement . 

Pipe model

In this model , The pipe is one-way , from PUSH The end is unidirectional PULL One way push data flow at the end . 

Two 、 Process steps

1、 Request response model

Server implementation :

(1)zmq_ctx_new()

// return ctx_t object 
void *zmq_ctx_new (void)
{
    //  We do this before the ctx constructor since its embedded mailbox_t
    //  object needs the network to be up and running (at least on Windows).
    if (!zmq::initialize_network ()) {
        return NULL;
    }


    //  Create 0MQ context.
    zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
    if (ctx) {
        if (!ctx->valid ()) {
            delete ctx;
            return NULL;
        }
    }
    return ctx;
}

The function returns context( Context ), It's actually calling theta ctx_t object ( Instantiation ),

zmq::ctx_t::ctx_t () :
    _tag (ZMQ_CTX_TAG_VALUE_GOOD),
    _starting (true),
    _terminating (false),
    _reaper (NULL),
    _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
    _max_msgsz (INT_MAX),
    _io_thread_count (ZMQ_IO_THREADS_DFLT),
    _blocky (true),
    _ipv6 (false),
    _zero_copy (true)
{
#ifdef HAVE_FORK
    _pid = getpid ();
#endif
#ifdef ZMQ_HAVE_VMCI
    _vmci_fd = -1;
    _vmci_family = -1;
#endif
    //  Initialise crypto library, if needed.
    zmq::random_open ();

#ifdef ZMQ_USE_NSS
    NSS_NoDB_Init (NULL);
#endif

#ifdef ZMQ_USE_GNUTLS
    gnutls_global_init ();
#endif
}

It mainly sets initialization parameters ; Such as ;_max_sockets = 1024;_io_thread_count = 1; Of course, there are some status settings and so on ;

(2)zmq_socket()

void *zmq_socket (void *ctx_, int type_)
{
// The object is NULL, Then return to 
    if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
        errno = EFAULT;
        return NULL;
    }
// Strong go 
    zmq::ctx_t *ctx = static_cast<zmq::ctx_t *> (ctx_);
    zmq::socket_base_t *s = ctx->create_socket (type_);
    return (void *) s;
}

Parameters :

void *ctx_;zmq_ctx_new Context parameter returned ;

int type_:Socket types.

Socket types:

ZMQ_PAIR 0

ZMQ_PUB 1

ZMQ_SUB 2

ZMQ_REQ 3

ZMQ_REP 4

ZMQ_DEALER 5

ZMQ_ROUTER 6

ZMQ_PULL 7

ZMQ_PUSH 8

ZMQ_XPUB 9

ZMQ_XSUB 10

ZMQ_STREAM 11

  Return value : Create the generated socket, call create_socket().

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
    scoped_lock_t locker (_slot_sync);
// Initialize mailbox array , Add two slots (slots),
//zmq_ctx_term thread  and  reaper thread
    if (unlikely (_starting)) {
        if (!start ())
            return NULL;
    }


    //  Once zmq_ctx_term() was called, we can't create new sockets.
    if (_terminating) {
        errno = ETERM;
        return NULL;
    }


    //  If max_sockets limit was reached, return error.
    if (_empty_slots.empty ()) {
        errno = EMFILE;
        return NULL;
    }


    //  Choose a slot for the socket.
    uint32_t slot = _empty_slots.back ();
    _empty_slots.pop_back ();


    //  Generate new unique socket ID.
// Generate a new unique socket ID.
    int sid = (static_cast<int> (max_socket_id.add (1))) + 1;


    //  Create the socket and register its mailbox.
// establish socket And register email 
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
    if (!s) {
        _empty_slots.push_back (slot);
        return NULL;
    }
    _sockets.push_back (s);
    _slots[slot] = s->get_mailbox ();


    return s;
}

call start () function :

bool zmq::ctx_t::start ()
{
    //  Initialise the array of mailboxes. Additional two slots are for
    //  zmq_ctx_term thread and reaper thread.
    _opt_sync.lock ();
    const int term_and_reaper_threads_count = 2;
    const int mazmq = _max_sockets; //1023
    const int ios = _io_thread_count;//1
    _opt_sync.unlock ();
    int slot_count = mazmq + ios + term_and_reaper_threads_count;//1026
    try {
// Add capacity , Don't create objects 
        _slots.reserve (slot_count);
        _empty_slots.reserve (slot_count - term_and_reaper_threads_count);
    }
    catch (const std::bad_alloc &) {
        errno = ENOMEM;
        return false;
    }
// Changed the size of the container , And created the objects in the container 
    _slots.resize (term_and_reaper_threads_count);


    //  Initialise the infrastructure for zmq_ctx_term thread.
    _slots[term_tid] = &_term_mailbox;


// Create thread 
    _reaper = new (std::nothrow) reaper_t (this, reaper_tid);
    if (!_reaper) {
        errno = ENOMEM;
        goto fail_cleanup_slots;
    }
    if (!_reaper->get_mailbox ()->valid ())
        goto fail_cleanup_reaper;
    _slots[reaper_tid] = _reaper->get_mailbox ();
    _reaper->start ();


    //  Create I/O thread objects and launch them.
    _slots.resize (slot_count, NULL);
// establish IO Thread and start 
    for (int i = term_and_reaper_threads_count;
         i != ios + term_and_reaper_threads_count; i++) {
        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
        if (!io_thread) {
            errno = ENOMEM;
            goto fail_cleanup_reaper;
        }
        if (!io_thread->get_mailbox ()->valid ()) {
            delete io_thread;
            goto fail_cleanup_reaper;
        }
        _io_threads.push_back (io_thread);
        _slots[i] = io_thread->get_mailbox ();
        io_thread->start ();
    }


    //  In the unused part of the slot array, create a list of empty slots.
    for (int32_t i = static_cast<int32_t> (_slots.size ()) - 1;
         i >= static_cast<int32_t> (ios) + term_and_reaper_threads_count; i--) {
        _empty_slots.push_back (i);
    }


    _starting = false;
    return true;


fail_cleanup_reaper:
    _reaper->stop ();
    delete _reaper;
    _reaper = NULL;


fail_cleanup_slots:
    _slots.clear ();
    return false;
}

(3)zmq_bind()

(4)zmq_recv()

(5)zmq_send()

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>

int main(int argc, char* argv[]) 
{
    void* context=zmq_ctx_new();
    void* socket=zmq_socket(context,ZMQ_REP);
    zmq_bind(socket,"tcp://192.168.207.129:9999");

    while(true)
    {
        char buf[10];
        int bytes=zmq_recv(socket,buf,10,0);
        buf[bytes]='\0';
        printf("[Server] Received Request Message:%d bytes,content:\"%s\"\n",bytes,buf);

        sleep(1);

        const char* replyMsg="World";
        bytes=zmq_send(socket,replyMsg,strlen(replyMsg),0);
        printf("[Server] Sended Reply Message:%d bytes,content:\"%s\"\n",bytes,replyMsg);
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);
    return 0;
}

Client implementation :

(1)zmq_ctx_new()

(2)zmq_socket()

(3)zmq_bind()

(4)zmq_send()

(5)zmq_recv()

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main(int argc, char* argv[])
{
    printf("Connect to server...\n");

    void* context=zmq_ctx_new();
    void* socket=zmq_socket(context,ZMQ_REQ);
    zmq_connect(socket,"tcp://192.168.207.129:9999");

    int k=0;
    while(true)
    {
        char buf[10];
        const char* requestMsg="Hello";
        int bytes=zmq_send(socket,requestMsg,strlen(requestMsg),0);
        printf("[Client] [%d] Send request Message: %d bytes,content:\"%s\"\n",k,bytes,requestMsg);

        bytes=zmq_recv(socket,buf,10,0);
        buf[bytes]='\0';
        printf("[Client] [%d] Received Reply Message: %d bytes,content:\"%s\"\n",k,bytes,buf);

        k++;
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);
    return 0;
}

 MakeFile

all:client server

client:client.cpp
	g++ -std=c++11 client.cpp -o client -lzmq -lpthread -g

server:server.cpp
	g++ -std=c++11 server.cpp -o server -lzmq -lpthread -g

clean:
	rm -f server client

make Is a run operation ,make clean eliminate server,client

2、 subscribe - Release pattern :

ZeroMQ The subscription publication mode of is a one-way data publication , After the client subscribes to the server , The server will continuously push the generated messages to the subscribers .

characteristic :

  • A publisher , Multiple subscribers , namely 1:n;
  • Publish data when publisher data changes , All subscribers can receive data and process it . This is the release / A subscription model .
  • Be careful : Use SUB When setting up a subscription , You have to use zmq_setsockopt( ) Filter messages ;

Publisher use PUB Socket sends message to queue , Subscribers use SUB Sockets flow from the queue iesho9u news . New subscribers can join at any time , But previous messages cannot be received ; Existing subscribers can exit at any time ; Subscribers can also add “ filter ” Used to selectively receive information .

 

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>

int main(int arg,char* argv[])
{
    void* context=zmq_ctx_new();
    assert(context!=NULL);

    void* socket=zmq_socket(context,ZMQ_PUB);//socket Publisher mode 
    assert(socket!=NULL);

    int ret=zmq_bind(socket,"tcp://192.168.207.129:9999");
    assert(ret==0);

    int k=0;
    while(true)
    {
        char buf[1024];
        memset(buf,0,sizeof(buf));
        snprintf(buf,sizeof(buf),"server i=%d",k);
        ret=zmq_send(socket,buf,strlen(buf)+1,0);
        k++;
    }

    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>
#include <thread>

#define TRUE 1

void Recv(void* arg)
{
    while(TRUE)
    {
        void* socket=arg;
        printf("into while\n");
        char buf[1024];
        memset(buf,0,sizeof(buf));
        int ret=zmq_recv(socket,buf,sizeof(buf)-1,0);
        if(ret>0)
        {
            printf("Recv:%s\n",buf);
        }
    }
}

int main(int argc, char** argv)
{
    printf("Hello,world!\n");

    void* context=zmq_ctx_new();
    assert(context!=NULL);

    void* socket=zmq_socket(context,ZMQ_SUB);// Subscriber pattern 
    assert(socket!=NULL);

    int ret=zmq_connect(socket,"tcp://192.168.207.129:9999");
    assert(ret==0);

    ret=zmq_setsockopt(socket,ZMQ_SUBSCRIBE,"",0);
    assert(ret==0);

    std::thread t1(Recv,socket);
    std::thread t2(Recv,socket);

    t1.join();
    t2.join();

    zmq_close(socket);
    zmq_ctx_destroy(context);
    
    return 0;
}

makefile:

all:pub sub

CXX=g++
CXXFLAGS=-fPIC  -std=c++11 -o
LDFLAGS=-lzmq -lpthread 

pub:pub.cpp 
	$(CXX) pub.cpp $(CXXFLAGS) pub $(LDFLAGS)

sub:sub.cpp
	$(CXX) sub.cpp $(CXXFLAGS) sub $(LDFLAGS)

clean:
	rm -f sub pub

3、 Push pull mode

Push pull mode ,PUSH send out ,send.PULL Party A receives ,recv.PUSH Can and multiple PULL Establishing a connection ,PUSH The data sent is sent to in sequence PULL Fang . Like you PUSH And three PULL Establishing a connection , Namely A,B,C.PUSH The first data sent will be sent to A, The second data will be given to B, The third data is for C, The fourth data to A. It's been a cycle .

  • At the top is the task generation Distributor ventilator
  • In the middle is the executor worker
  • Here are the recipients of the collected results sink

  Distributor ventilator

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>


int main(int argc,char **argv)
{
    void * context=zmq_ctx_new();
    void * sender=zmq_socket(context, ZMQ_PUSH);
    zmq_bind(sender, "tcp://*:6666");
    printf("Press Enter when the workers are ready: ");
    getchar();
    printf("Sending tasks to workers...\n");
    while(true)
    { 
        const char * replyMsg="World";
        zmq_send(sender, replyMsg, strlen(replyMsg), 0);
        printf("[Server] Sended Reply Message content == \"%s\"\n", replyMsg);
    }


    zmq_close(sender);
    zmq_ctx_destroy(context);

    return 0;
}

practitioners worker

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <zmq.h>

int main(int argc, char **argv)
{
    void * context=zmq_ctx_new();
    void * recviver=zmq_socket(context, ZMQ_PULL);
    zmq_connect(recviver, "tcp://localhost:6666");
    void * sender=zmq_socket(context, ZMQ_PUSH);
    zmq_connect(sender, "tcp://localhost:5555");


    while(1)
    {
        char buffer [256];
        int size=zmq_recv (recviver, buffer, 255, 0);
        if(size < 0)
        {
            break;
        }
        printf("buffer:%s\n",buffer);
        const char * replyMsg="World";
        zmq_send(sender, replyMsg, strlen(replyMsg), 0);
        printf("[Server] Sended Reply Message content=\"%s\"\n", replyMsg);
    }

    zmq_close(recviver);
    zmq_close(sender);
    zmq_ctx_destroy(context);

    return 0;
}

The receiver sink

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>


int main(int argc, char **argv)
{
    void * context=zmq_ctx_new();
    void * socket=zmq_socket(context,ZMQ_PULL);
    zmq_bind(socket, "tcp://*:5555");


    while(true)
    { 
       char buffer [256];
       int size=zmq_recv(socket,buffer,255,0);
       if(size<0)
       {
           break;
       }
        printf("buffer:%s\n",buffer);
    }


    zmq_close(socket);
    zmq_ctx_destroy(context);

    return 0;
}

原网站

版权声明
本文为[Dreamers on the road]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202180539489527.html