当前位置:网站首页>uv_ loop_ Init() process

uv_ loop_ Init() process

2022-06-22 03:02:00 Call me Xiao Huang

Preface

libuv yes c/c++ A pure... Is commonly used in c The network library , comparison asio Additional thread pools are provided , And the function of process pool .
I spent some time reading libuv Source code . Find out libuv Although it is also based on reactor Network model architecture , but libuv Of eventloop Compare with muduo, as well as asio There is a big difference in the design concept , stay asio and muduo Tasks are divided into asynchronous tasks , And common tasks , But in libuv There is no distinction between normal tasks and asynchronous tasks , stay libuv It abstracts handle as well as request,handle Corresponding to operations with a long declaration cycle , That is, the object that will be triggered for each event loop ;request For objects that will only be triggered once , Generally, it corresponds to reading and writing .

Here we start from initializing the event loop uv_loop_init() Start . I will try my best to explain it in the form of pseudo code , It is recommended to download the source code .

uv_loop_init()

Queue

QUEUE_INIT Such macro functions are used for inserting ring queues , Delete , Search, etc. . The queue element is a array[2], array[0] Point to next element ,array[1] Point to the previous element .

Code

The following code deletes some unimportant logic codes

int uv_loop_init(uv_loop_t *loop)
{
    
  uv__loop_internal_fields_t *lfields;
  void *saved_data;
  int err;
  
  saved_data = loop->data;        //  take loop->data Save the pointer , avoid data The pointer is initialized to a null pointer 
  memset(loop, 0, sizeof(*loop)); // Fu 0 value 
  loop->data = saved_data;

  //  open up uv__loop_internal_fields_t On the pile 
  lfields = (uv__loop_internal_fields_t *)uv__calloc(1, sizeof(*lfields));
  if (lfields == NULL)
  	//  Dynamic memory allocation failure returns 
    return UV_ENOMEM;
  loop->internal_fields = lfields;

  //  initialization loop_metrics  Lock of 
  err = uv_mutex_init(&lfields->loop_metrics.lock);
  if (err)
    goto fail_metrics_mutex_init; // Jump to error handler 
  
  // Now initialize 
  heap_init((struct heap *)&loop->timer_heap); // Initialize timer minimum heap 
  QUEUE_INIT(&loop->wq);              //
  QUEUE_INIT(&loop->idle_handles);    //
  QUEUE_INIT(&loop->async_handles);   //
  QUEUE_INIT(&loop->check_handles);   //
  QUEUE_INIT(&loop->prepare_handles); //
  QUEUE_INIT(&loop->handle_queue);    //
  QUEUE_INIT(&loop->pending_queue);
  QUEUE_INIT(&loop->watcher_queue);
  uv__update_time(loop);// initialization loop The current time of , Unit is ms

  err = uv__platform_loop_init(loop); // establish epoll,fd Assign a value to loop->backend_fd
  if (err)
    goto fail_platform_init;

  uv__signal_global_once_init(); //  Used for signal related functions (libuv  Encapsulates its own signaling mechanism )
  err = uv_signal_init(loop, &loop->child_watcher);
  if (err)
    goto fail_signal_init;

  QUEUE_INIT(&loop->process_handles);


  err = uv_async_init(loop, &loop->wq_async, uv__work_done); // initialization wq_async The callback for is uv__work_done
  if (err)
    goto fail_async_init;


  return 0;

  // Exception handling code block , Use goto Jump 
}

uv__signal_global_once_init()

This function actually calls the system api pthread_once(guard, callback), The purpose is to complete the initialization of signal related functions .
callback by uv__signal_global_init
uv__signal_global_init call pthread_atfork(param1, param2, param3) register
uv__signal_global_reinit function

This involves system calls pthread_once,pthread_atfork

pthread_once: You can ensure that you only call once in a thread callback*

pthread_atfork: Callbacks can be registered , stay fork Called when the , The call times are Parma1:fork Call in the parent process context before creating the child process ;parma2:fork Call in the context of the parent process after the child process ;parma3:fork Call in the context of the child process after the child process is out .
link

Next, let's have a good understanding of the function , The function code is actually uv__signal_global_reinit

uv__signal_global_reinit

static void uv__signal_global_reinit(void)
{
    
  uv__signal_cleanup();

  /* Create a pipeline for parent-child process communication */
  if (uv__make_pipe(uv__signal_lock_pipefd, 0))
    abort();

  if (uv__signal_unlock())
    abort();
}

void uv__signal_cleanup(void)
{
    
  /* We can only use signal-safe functions here. * That includes read/write and close, fortunately. * We do all of this directly here instead of resetting * uv__signal_global_init_guard because * uv__signal_global_once_init is only called from uv_loop_init * and this needs to function in existing loops. */
  if (uv__signal_lock_pipefd[0] != -1)
  {
    
    uv__close(uv__signal_lock_pipefd[0]);
    uv__signal_lock_pipefd[0] = -1;
  }

  if (uv__signal_lock_pipefd[1] != -1)
  {
    
    uv__close(uv__signal_lock_pipefd[1]);
    uv__signal_lock_pipefd[1] = -1;
  }
}

uv__signal_global_reinit The reason why is relatively complicated in ,libuv Provides multi threading fork The function of . Because when calling fork when ,fork Out of the child process , Will call fork The thread of ( There is no difference in the nature of process and thread ) File descriptor for , Leak to child process , So extra work is required to clean up file descriptors . And because of fork Out of the child process has uv__signal_lock_pipefd, Can be called directly close Close the fd It will not affect the parent process to the fd Read and write , The reason is that the process is the smallest unit of resource allocation .

uv__signal_global_reinit Logical execution process

So the execution logic of this part of code is : When the main thread calls uv__signal_global_reinit when , If uv__signal_lock_pipefd Not initialized , Then register uv__signal_global_reinit, be used for fork When initializing a child process uv__signal_lock_pipefd-> call
uv__signal_global_reinit Initialize the current process -> Create a pipeline to initialize the current process ( Threads ) Of uv__signal_lock_pipefd( Note that the pipeline is one-way communication , Create and return two file descriptors at a time )

uv_signal_init()

Use as follows :

err = uv_signal_init(loop, &loop->child_watcher);
The purpose of this function is to loop->child_watcher->handle_ queue Add to eventloop Of handle_queue In a circular queue

uv_async_init()

This function is used to initialize libuv Asynchronous event function of

int uv_async_init(uv_loop_t *loop, uv_async_t *handle, uv_async_cb async_cb)
{
    
  int err;

  err = uv__async_start(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t *)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;

  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); // take &handle->queue Add to &loop->async_handles A party 
  uv__handle_start(handle);                                // Increase the count 

  return 0;
}

uv__work_done

This function Handle uv_async_t User callback bound to the asynchronous work object on the .

void uv__work_done(uv_async_t *handle)
{
    
  struct uv__work *w;
  uv_loop_t *loop;
  QUEUE *q;
  QUEUE wq;
  int err;

  loop = container_of(handle, uv_loop_t, wq_async);
  uv_mutex_lock(&loop->wq_mutex);
  QUEUE_MOVE(&loop->wq, &wq); // take loop->wq Take out the elements in 
  uv_mutex_unlock(&loop->wq_mutex);

  //  Perform all task callbacks 
  while (!QUEUE_EMPTY(&wq))
  {
    
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err);
  }
}

uv__async_io

perform eventloop All in asyncuv_async_t Of uv__work_done function , The call time is loop->async_io_watcher.fd When there is data on the .

static void uv__async_io(uv_loop_t *loop, uv__io_t *w, unsigned int events)
{
    
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE *q;
  uv_async_t *h;

  assert(w == &loop->async_io_watcher);

  //  If you read the data, you jump out of the loop 
  for (;;)
  {
    
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }

  //  Start processing all async_handles
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue))
  {
    
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    if (0 == uv__async_spin(h))
      continue; /* Not pending. */

    if (h->async_cb == NULL)
      continue;

    h->async_cb(h);//async_cb by uv__work_done
  }
}

Sum up uv_async_init()

uv_async_init() Mainly did the following work :

  • by loop->async_io_watcher.fd Create file description symbols , Used for thread switching
  • Set up loop->wq_async-async_cb by uv__work_done
  • loop->async_io_watcher->cb by uv__async_io
  • add to loop->async_io_watcher.fd To eventloop Monitor the collection , initialization eventloop Add the file descriptor to epoll in
    uv__async_io Will call all async_handler Of uv__work_done, To execute the callback of the task object in its task queue ( User callback )

uv_async_init The flow chart is as follows

 Insert picture description here

原网站

版权声明
本文为[Call me Xiao Huang]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206220256138010.html