当前位置:网站首页>Deep analysis of epoll reactor code

Deep analysis of epoll reactor code

2022-06-25 05:20:00 Halya


Preface

This article is the author's reading notes , Please do not reprint without permission . If it helps you, remember to praise it (●’◡’●)
This article will help readers understand by adding pictures, codes and comments epoll Realization of reactor , At the end of the article, I will send my function code diagram to you for reference ( For the first time xmind Draw the , It's a bit messy , I made a fool of myself ).


One ,epoll Reactor model

1. Conventional epoll Server model

Listen for readable Events ⇒ Data coming ⇒ Trigger read event ⇒
epoll_wait() return ⇒ read news ⇒ write Reflexive information ⇒ continue epoll_wait()
⇒ It's a loop until the program stops

2.epoll Reactor server model

Listen for readable Events ⇒ Data coming ⇒ Trigger read event ⇒
epoll_wait() return ⇒
read Finish data ; Tree under node ; Set listening to write events and corresponding write callback functions ; Tree on node ( Within the readable event callback function )
.
Listen to writable events ⇒ Readable by the other party ⇒ Triggering event ⇒
epoll_wait() return ⇒
write data ; Tree under node ; Set listening to read events and corresponding read callback functions ; Tree on node ( Within the writeable event callback function )
⇒ Until the program stops, it's alternating
The lower and upper trees of nodes here involve pending and non pending transformations , Please see the figure below for details :
 Insert picture description here
Events are divided into three states : Non pending state ( The event has been initialized but not listened ), Pending state ( Initialize and listen for the event ), Activation state .

3. Why? epoll This is how the reactor model is designed ?

Conventional epoll In the server model , Only one location on the tree is needed to send and receive messages .epoll In the reactor model , For the same socket for , It takes up at least two positions in the tree to receive and send messages . Frequently switch between pending and non pending events , It's designed for a reason , Although it will waste some performance , But such a design can ensure the stability of the server , Avoid errors or blocking when the server sends and receives data ( See below for details ), And tradition epoll The model does not have such a function .
.

Sliding window mechanism

The server read After the data to the client , Let's say that the receiving window of the client is full at this time , It is assumed that the writable event setting is not performed , And the client is intended to make its acceptance sliding window full of hackers . that , The current server will be blocked at send At function , Cannot send data to client , Cause server program to block .

SIGPIPE The signal

client send After data , Suddenly stop due to abnormality , This will result in a FIN Send to the server . If the server does not set the writable event listener , Then the server is read After data , Write data directly to a socket without a read end ,TCP The protocol stack will send to the server SIGPIPE The signal , Cause server process to terminate .

Two ,epoll Reactor model Server Code

#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>

#define MAX_EVENTS 1024
#define SERVER_PORT 8888

struct my_events {
    
    void       *m_arg;                                       // The generic parameter , difficulty 
    int        m_event;                                      // Listening events  
    int        m_fd;                                         // Listening file descriptor 
    void       (*call_back)(int fd, int event, void *arg);   // Callback function 

    char       m_buf[BUFSIZ];
    int        m_buf_len;
    int        m_status;                                     // Is it on the red black tree , 1-> stay , 0-> be not in 
    time_t     m_lasttime;                                   // The last time to put in the red black tree 
};

int                    ep_fd;                                // Red and black root (epoll_create Handle returned )
struct my_events       ep_events[MAX_EVENTS];                // Variables defined outside any function body are initialized to 0(bss paragraph )

/* Initialize listening socket*/
void initlistensocket(int ep_fd, unsigned short port);
/* Initializing struct member variables */
void eventset(struct my_events *my_ev, int fd, void (*call_back)(int fd, int event, void *arg), void *event_arg);
/* Add to red black tree   File descriptors and corresponding structures */
void eventadd(int ep_fd, int event, struct my_events *my_ev);
/* Remove from red black tree   File descriptors and corresponding structures */
void eventdel(int ep_fd, struct my_events *ev);
/* send data */
void senddata(int client_fd, int event, void *arg);
/* receive data */
void recvdata(int client_fd, int event, void *arg);
/* Callback function :  Receiving connection */
void acceptconnect(int listen_fd, int event, void *arg);


int main(void)
{
    
   unsigned short port = SERVER_PORT;

   ep_fd = epoll_create(MAX_EVENTS);                         // Create a red black tree , Return to global variable ep_fd;
   if (ep_fd <= 0)
      printf("create ep_fd in %s error: %s \n", __func__, strerror(errno));
  
   /* Initialize listening socket*/
   initlistensocket(ep_fd, port);

   int checkpos = 0;
   int i;
   struct epoll_event events[MAX_EVENTS]; //epoll_wait Outgoing parameters for ( Array : Save the file descriptor for the ready event )
   while (1)
   {
    
      /* Timeout verification , Every test 100 A connection ,60s If there is no communication with the server, the client connection will be closed */
      long now = time(NULL); // current time 
      for (i=0; i<100; i++,checkpos++) // Primary cycle detection 100 individual , Use checkpos Control test object 
      {
    
         if (checkpos == MAX_EVENTS-1)
             checkpos = 0;
         if (ep_events[i].m_status != 1) // Not on the black and red tree 
             continue;

         long spell_time = now - ep_events[i].m_lasttime; // Client inactive time 
         if (spell_time >= 600) // If the time exceeds 60s
         {
    
             printf("[fd= %d] timeout \n", ep_events[i].m_fd);  
             close(ep_events[i].m_fd); // Close client connection 
             eventdel(ep_fd, &ep_events[i]); // Remove the client from the red black tree 
         }     
      }
	  
      /* Monitor the red black tree , Add the qualified file descriptor to ep_events Array */ 
      int n_ready = epoll_wait(ep_fd, events, MAX_EVENTS, 1000); //1 Seconds, return if no event is satisfied 0
      if (n_ready < 0 && errno != EINTR)//EINTR:interrupted system call
      {
    
          perror("epoll_wait");
          break;
      }

      for (i=0; i<n_ready; i++)
      {
    
		   // Outgoing parameters events[i].data Of ptr Assign a value to " Custom structure ev The pointer "
           struct my_events *ev = (struct my_events *)(events[i].data.ptr); 
           if ((events[i].events & EPOLLIN) && (ev->m_event & EPOLLIN))  // Read ready event 
               ev->call_back(ev->m_fd, events[i].events, ev->m_arg);
           if ((events[i].events & EPOLLOUT) && (ev->m_event & EPOLLOUT)) // Write ready event 
               ev->call_back(ev->m_fd, events[i].events, ev->m_arg);
      }
   }
   return 0;
}     

/* Initialize listening socket*/
void initlistensocket(int ep_fd, unsigned short port)
{
    
	int                  listen_fd;
	struct sockaddr_in   listen_socket_addr;

	printf("\n initlistensocket() \n");  

	int opt = 1;
	setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));// Set to be able to reuse ports 
   /*SO_REUSEADDR:( This usually occurs when the listening server is restarted , If this option is not set , be bind There will be an error when the .) SOL_SOCKET:To manipulate options at the sockets API level, level is specified as SOL_SOCKET.*/

	/* Apply for one socket*/
	listen_fd = socket(AF_INET, SOCK_STREAM, 0);
	fcntl(listen_fd, F_SETFL, O_NONBLOCK); // take socket Set to non-blocking mode 
   /* send I/O Become non blocking mode (non-blocking), When the data cannot be read or the write buffer is full return, Instead of blocking the wait .*/
	
   /* Initialization before binding */
	bzero(&listen_socket_addr, sizeof(listen_socket_addr));
	listen_socket_addr.sin_family      = AF_INET;
	listen_socket_addr.sin_port        = htons(port);
	listen_socket_addr.sin_addr.s_addr = htonl(INADDR_ANY);
	/* binding */
	bind(listen_fd, (struct sockaddr *)&listen_socket_addr, sizeof(listen_socket_addr));
	/* Set listening upper limit */
	listen(listen_fd, 128);

	/* take listen_fd initialization */
	eventset(&ep_events[MAX_EVENTS-1], listen_fd, acceptconnect, &ep_events[MAX_EVENTS-1]);    
	/* take listen_fd Hang the red black tree */
	eventadd(ep_fd, EPOLLIN, &ep_events[MAX_EVENTS-1]);

    return ;
}

/* Initializing struct member variables */
void eventset(struct my_events *my_ev, int fd, void (*call_back)(int, int, void *), void *event_arg)
{
    
   my_ev->m_fd       = fd;
   my_ev->m_event    = 0; // At first, I didn't know what events I was focusing on , So set to 0
   my_ev->m_arg      = event_arg;// Point to your 
   my_ev->call_back  = call_back;
   
   my_ev->m_status   = 0; //0 It means not on the red black tree 
   my_ev->m_lasttime = time(NULL);// call eventset Absolute time of function 
   return ;
}

/* Add the file descriptor and the corresponding structure to the red black tree */
void eventadd(int ep_fd, int event, struct my_events *my_ev)
{
    
  int op;
  struct epoll_event epv;
  epv.data.ptr = my_ev;// Give Way events[i].data.ptr Point to our initialized my_events, Decided at the time of registration , Wait until the event is active , then ptr Take out the contents of , Compare , Just call back .
  epv.events   = my_ev->m_event = event; //EPOLLIN or EPOLLOUT, Default LT

  if (my_ev->m_status == 0)
  {
    
    op = EPOLL_CTL_ADD;
  }
  else
  {
    
    printf("\n add error: already on tree \n");
    return ;
  }
  
  if (epoll_ctl(ep_fd, op, my_ev->m_fd, &epv) < 0) // Actual addition / modify 
  {
    
     printf("\n event add/mod false [fd= %d] [events= %d] \n", my_ev->m_fd, my_ev->m_event);
  }
  else
  {
    
     my_ev->m_status = 1;
     printf("\n event add ok [fd= %d] [events= %d] \n", my_ev->m_fd, my_ev->m_event);
  }

  return ;
}
/* Remove from red black tree   File descriptors and corresponding structures */
void eventdel(int ep_fd, struct my_events *ev)
{
    
  if(ev->m_status != 1)
     return ;

  epoll_ctl(ep_fd, EPOLL_CTL_DEL, ev->m_fd, NULL);
  ev->m_status = 0;
 
  return ;
}

/* Callback function :  Receiving connection */
void acceptconnect(int listen_fd, int event, void *arg)
{
    
  int                 connect_fd;
  int                 i;// identification ep_events The array subscript 
  int                 flag=0;
  char                str[BUFSIZ];
  struct sockaddr_in  connect_socket_addr;
  socklen_t           connect_socket_len;// a value-result argument
  /*the caller must initialize it to contain the size (in bytes) of the structure pointed to by addr; on return it will contain the actual size of the peer address.*/

  if ( (connect_fd=accept(listen_fd, (struct sockaddr *)&connect_socket_addr, &connect_socket_len)) <0 )
   /*addrlen Out-of-service &+sizeof To get ,sizeof The result of the expression is unsigned Pure right value of , It cannot be referenced .  Main cause __addr_len Is an incoming and outgoing parameter */
  {
    
     if (errno != EAGAIN && errno != EINTR)
        {
    /* Do not deal with */}
     printf("\n %s: accept, %s \n", __func__, strerror(errno));
     return ;
  }
  
  do
  {
    
    for(i=0; i<MAX_EVENTS; i++) // From global array ep_events Find a free location in i( Be similar to select The median value is -1 The location of )
        if(ep_events[i].m_status == 0) 
           break;
    if(i >= MAX_EVENTS)
     {
    
        printf("\n %s : max connect [%d] \n", __func__, MAX_EVENTS);
        break;
     }      
    
	/*  Set non-blocking  */
    if((flag = fcntl(connect_fd, F_SETFL, O_NONBLOCK)) <0)
    {
    
       printf("\n %s: fcntl nonblocking false, %s \n", __func__, strerror(errno));
       break;
    }

    eventset(&ep_events[i], connect_fd, recvdata, &ep_events[i]);
    eventadd(ep_fd, EPOLLIN, &ep_events[i]);

  }while(0);

   printf("\n new connection [%s:%d] [time:%ld] [pos:%d] \n", inet_ntop(AF_INET, &connect_socket_addr.sin_addr, str, sizeof(str)), 
                                ntohs(connect_socket_addr.sin_port), ep_events[i].m_lasttime, i);
   return ;
}

/* receive data */
void recvdata(int client_fd, int event, void *arg)
{
    
  int              len;
  struct my_events *ev = (struct my_events *)arg;

  len = recv(client_fd, ev->m_buf, sizeof(ev->m_buf), 0);
  //1. take ev The corresponding file descriptor and structure are removed from the red black tree 
  eventdel(ep_fd, ev);                                      

  if (len >0)
  {
    
      ev->m_buf_len      = len;
      ev->m_buf[len] = '\0'; // Add end tag manually 
      printf("\n Client[%d]: %s \n", client_fd, ev->m_buf);

      eventset(ev, client_fd, senddata, ev); //2. Set up client_fd The corresponding callback function is senddata
      eventadd(ep_fd, EPOLLOUT, ev); //3. take ev Put the corresponding file descriptor and structure on the red black tree , Listen to write events EPOLLOUT
  }
  else if (len == 0)
  {
    
      close(ev->m_fd);
      eventdel(ep_fd, ev);
      printf("\n [Client:%d] disconnection \n", ev->m_fd);
  }
  else
  {
    
      close(ev->m_fd);
      eventdel(ep_fd, ev);
      printf("\n error: [Client:%d] disconnection\n", ev->m_fd);
  }
 
  return ;
}

/* send data */
void senddata(int client_fd, int event, void *arg)
{
    
  int              len; 
  struct my_events *ev = (struct my_events *)arg;
 
  len = send(client_fd, ev->m_buf, ev->m_buf_len, 0);   // Write back 

  if (len > 0)
  {
    
     printf("\n send[fd=%d], [len=%d] %s \n", client_fd, len, ev->m_buf);
     eventdel(ep_fd, ev);  //1. take ev The corresponding file descriptor and structure are removed from the red black tree 
     eventset(ev, client_fd, recvdata, ev); //2. Set up client_fd The corresponding callback function is recvdata
     eventadd(ep_fd, EPOLLIN, ev); //3. take ev Put the corresponding file descriptor and structure on the red black tree , Listen to read events EPOLLIN 
  }
  else
  {
    
     close(ev->m_fd);
     eventdel(ep_fd, ev);
     printf("\n send[fd=%d] error \n", client_fd);
  }
  return ;
}

3、 ... and ,epoll Reactor model client Code

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>


#define MAX_LINE (1024)
#define SERVER_PORT (8888)

void setnoblocking(int fd)
{
    
  int opts=fcntl(fd,F_GETFL);
  opts=opts|O_NONBLOCK;
  fcntl(fd,F_SETFL,opts);
}

int main(int argc,char* argv[])
{
    
  int sockfd;
  char recvbuf[MAX_LINE+1]={
    0};/* The default initialization value of a variable in a function body is undefined ( Different compilers have different methods ),  Try to initialize manually ,{0} Only the top elements can be initialized , The rest is initialized to default values (\000).*/

  struct sockaddr_in server_addr;

  /* if(argc!=2) { fprintf(stderr,"usage ./cli <SERVER_IP> \n"); exit(0); } */

  if((sockfd=socket(AF_INET,SOCK_STREAM,0))<0)
  {
    
    fprintf(stderr,"socket error");
    exit(0);
  }

  bzero(&server_addr,sizeof(server_addr));
  server_addr.sin_family=AF_INET;
  server_addr.sin_port=htons(SERVER_PORT);
  server_addr.sin_addr.s_addr=inet_addr("127.0.0.1");  
  /* if(inet_pton(AF_INET,argv[1],&server_addr.sin_addr)<=0) { fprintf(stderr,"inet_pton error for %s",argv[1]); exit(0); } */
  if(connect(sockfd,(struct sockaddr*)&server_addr,sizeof(server_addr))<0)
  {
    
    perror("connect");
    fprintf(stderr,"connect error\n");
    exit(0);
  }
  setnoblocking(sockfd);

  char input[100];
  int n=0;
  int count=0;

  while(fgets(input,100,stdin)!=NULL)// Read at most at one time 99 Characters , until stdin Buffer read complete 
  /* fgets() reads in at most one less than size characters from stream and stores them into the buffer pointed to by s.*/
  {
    
    printf("[send]:%s\n",input);
    n=send(sockfd,input,strlen(input),0);
    if(n<0)
    {
    
      perror("send");
    }
    n=0;
    count=0;

    while(1)
    {
    
      n=read(sockfd,recvbuf+count,MAX_LINE);/* Report errors :Resource temporarily unavailable, Cause the time interval between sending and receiving is too short  ( This error indicates that the resources are temporarily insufficient , can read when , There is no data in the read buffer , perhaps write when , The write buffer is full .) Lead to read Can't read the data and EAGAIN*/
      if(n<0)
      {
    
        if(errno == EAGAIN)// terms of settlement 
          continue;
        perror("recv");
        break;
      }
      else 
      {
    

        count+=n;
        recvbuf[count]=0;// Same as '0'
        printf("[recv]:%s\n",recvbuf);
        break;
      }
    }
  }

  return 0;
}

Additional content

The following picture is what I saw when I looked up the information on the Internet ( If you remember correctly, this picture comes from dark horse linux Network programming ), To understand the epoll The working principle of the reactor at the bottom is helpful . The kernel and service processes in this part are well understood , But the user space mmap Whether the working mechanism here is as described in the following figure , I can't be sure now . I will be in websever After the completion of the project, continue to come back for in-depth study ( Coming soon :)
 Insert picture description here

The attachment

More code details in this xmind Inside ( Notice that 0 Download points , Don't be trapped. VIP The words startled :)
epoll Reactor function code diagram

原网站

版权声明
本文为[Halya]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202210517411747.html