当前位置:网站首页>Redis source code and design analysis -- 17. Redis event processing
Redis source code and design analysis -- 17. Redis event processing
2022-07-25 17:46:00 【JunesFour】
Redis Event handling
List of articles
1. Redis Introduction to the incident
Redis The server is a Event driver , The so-called event driven is to enter a command and press enter , Then the message is assembled into Redis The format of the agreement is sent to Redis The server , At this time, an event will occur ,Redis The server will receive the change command , Process the command and send a reply , And when we don't interact with the server , The server will be in a blocking waiting state , It will yield CPU Then go to sleep , When the event triggers , Will be awakened by the operating system .
and Redis The server needs to handle the following two types of events :
Document events:Redis Server through socket with client ( Or other Redis The server ) Connect , File events are the server's abstraction of socket operations . Server and client ( Or other servers ) The corresponding file event will be generated by the communication of , The server listens and processes these events to complete a series of network communication operations .Time event:Redis Some operations in the server ( such as serverCron function ) Need to be executed at a given point in time , Time event is the server's abstraction of such timing operation .
2. The abstraction of events
Redis hold Document events and Time event They are abstracted into a data structure to manage .
2.1 File event structure
typedef struct aeFileEvent {
// File time type :AE_NONE,AE_READABLE,AE_WRITABLE
int mask;
// Readable processing function
aeFileProc *rfileProc;
// Writable handler
aeFileProc *wfileProc;
// Data passed in by the client
void *clientData;
} aeFileEvent; // Document events
among rfileProc and wfileProc Members are two function pointers , Their prototype is :
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
The function is Callback function , If the event type specified by the current file event occurs , The corresponding Callback function To handle the event .
When the event is ready , We need to know the file descriptor of the file event and the event type to lock the event , So it defines aeFiredEvent Structure unified management :
typedef struct aeFiredEvent {
// File descriptor of ready event
int fd;
// Ready event type :AE_NONE,AE_READABLE,AE_WRITABLE
int mask;
} aeFiredEvent; // Ready event
Type of file event :
#define AE_NONE 0 // Not set
#define AE_READABLE 1 // Events are readable
#define AE_WRITABLE 2 // Event writable
2.2 Time event structure
typedef struct aeTimeEvent {
// Time events id
long long id;
// The number of seconds of the time the event arrived
long when_sec; /* seconds */
// The number of milliseconds of the time the event arrived
long when_ms; /* milliseconds */
// Time event handler
aeTimeProc *timeProc;
// Time event termination function
aeEventFinalizerProc *finalizerProc;
// Data passed in by the client
void *clientData;
// Point to the next time event
struct aeTimeEvent *next;
} aeTimeEvent; // Time event
It can be seen that , The structure of a time event is a node of a linked list , because struct aeTimeEvent *next Is a pointer to the next time event .
Same as file event , When the event specified by the time event occurs , The corresponding... Will also be called Callback function , Structural members timeProc and finalizerProc It's all callback functions , The function prototype is as follows :
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
Although file events and time events are abstracted ,Redis We still need to make an overall abstraction of the event , Used to describe the state of an event . That is, the event state structure to be introduced below :aeEventLoop.
2.3 Event state structure
typedef struct aeEventLoop {
// The largest file descriptor currently registered
int maxfd; /* highest file descriptor currently registered */
// The size of the file descriptor listener set
int setsize; /* max number of file descriptors tracked */
// The next time event ID
long long timeEventNextId;
// Time of the last execution event
time_t lastTime; /* Used to detect system clock skew */
// Registered file event table
aeFileEvent *events; /* Registered events */
// File event table ready
aeFiredEvent *fired; /* Fired events */
// Pointer to the header node of the time event
aeTimeEvent *timeEventHead;
// Event handling switch
int stop;
// Event status data of multiplexing Library
void *apidata; /* This is used for polling API specific data */
// Execute the function before processing the event
aeBeforeSleepProc *beforesleep;
} aeEventLoop; // Status structure of event polling
aeEventLoop The structure holds a void * Universal pointer to type apidata, Used to save the status of polling events , That is, save the event state of the multiplexing library called by the bottom .
Redis Of I/O All the functions of a multiplexer are common through packaging select、epoll、evport and kqueue these I/O Multiplexing function library to achieve , Every I/O The multiplex function library is in Redis There is a separate file in the source code , such as ae_select.c、ae_epoll.c wait .
They are in the compilation phase , According to different systems, the multiplexing library with the highest performance will be selected as Redis Multiplexing program implementation , And all libraries API It's all the same , This can make Redis The underlying layers of multiplexers can be interchanged .
The following is the source code of the specific library :
// IO The choice of reuse , Performance declines in turn ,Linux Support "ae_epoll.c" and "ae_select.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
Or by order INFO server To see which multiplexing library is currently used :

You can see Linux The default is epoll Multiplexing Library , that apidata What is preserved is epoll The event state structure of the model , It's in ae_epoll.c In the source file :
typedef struct aeApiState {
// epoll The file descriptor for the event
int epfd;
// Event table
struct epoll_event *events;
} aeApiState; // The state of the event
epoll Model struct epoll_event Structure defines epoll Type of event , such as EPOLLIN、EPOLLOUT wait , however Redis File structure aeFileEvent It's also in mask Defines its own event type , for example :AE_READABLE、AE_WRITABLE wait , So we need to implement an intermediate layer to connect the event types of the two , This is what I mentioned earlier ae_epoll.c The implementation in the file is the same API:
// Create a epoll example , Save to eventLoop in
static int aeApiCreate(aeEventLoop *eventLoop)
// Resize the event table
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// Release epoll Instance and event tablespaces
static void aeApiFree(aeEventLoop *eventLoop)
// stay epfd Register on the identified event table fd Events
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// stay epfd Delete the note on the marked event table fd Events
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// Wait for an event to occur on the monitored file descriptor
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// Returns the in use IO The name of the multiplexing Library
static char *aeApiName(void)
these API Be able to speak epoll The underlying functions of ,Redis When implementing events , Just call these interfaces .
Let's take the following two API Source code example :
aeApiAddEvent
This function will send a message to Redis Event state structure aeEventLoop The event table for event Register an event , The corresponding is epoll_ctl function .
// stay epfd Register on the identified event table fd Events
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {
0};
// EPOLL_CTL_ADD, towards epfd register fd Of event
// EPOLL_CTL_MOD, modify fd Registered event
// #define AE_NONE 0 // Not set
// #define AE_READABLE 1 // Events are readable
// #define AE_WRITABLE 2 // Event writable
// Judge fd Action on event , If no event is set , Then we can make a correlation mask Type of event , Otherwise, modify
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// struct epoll_event {
// uint32_t events; /* Epoll events */
// epoll_data_t data; /* User data variable */
// };
ee.events = 0;
// If it is a modification event , Event types before merging
mask |= eventLoop->events[fd].mask; /* Merge old events */
// according to mask mapping epoll The type of event
if (mask & AE_READABLE) ee.events |= EPOLLIN; // Read events
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; // Write events
ee.data.fd = fd; // Set the target file descriptor to which the event belongs
// take ee Event registration to epoll in
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
aeApiPoll
Wait for an event to occur on the monitored file descriptor , Corresponding to the bottom epoll_wait function .
// Wait for an event to occur on the monitored file descriptor
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// Listen for events in the event table
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// There is at least one ready event
if (retval > 0) {
int j;
numevents = retval;
// Traverse the ready event table , Add it to eventLoop In the ready event table of
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
// According to the type of event ready , Set up mask
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
// Add to the ready event table
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
// Return the number of ready events
return numevents;
}
3. The realization of the event
All source codes of events are defined in ae.c In the source file , First from aeMain Function start .
// Main function of event polling
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// Always dealing with Events
while (!eventLoop->stop) {
// Execute the function before processing the event
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// Handle the time event when it arrives and the ready file event
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
You can see , If the server keeps processing events , So it's a dead circle , And the most typical event driven , It's a dead cycle . In circulation , The program will call the function that handles the event aeProcessEvents(), Its parameter is an event state structure aeEventLoop and AE_ALL_EVENTS.
Macro definition of event type , stay ae.h Header file :
#define AE_FILE_EVENTS 1 // Document events
#define AE_TIME_EVENTS 2 // Time event
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) // File and time events
#define AE_DONT_WAIT 4
// Handle the time event when it arrives and the ready file event
// If flags = 0, Functions do nothing , Go straight back to
// If flags Set up AE_ALL_EVENTS , Then execute all types of events
// If flags Set up AE_FILE_EVENTS , Then execute the file event
// If flags Set up AE_TIME_EVENTS , Then the execution time event
// If flags Set up AE_DONT_WAIT , Then the function returns directly after processing the event , Don't block waiting
// Function returns the number of events executed
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
// If no event is set, return directly
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
// Please note that , Since we have to deal with time events , Even if there are no file events to process , We still have to call select(), In order to sleep before the next event is ready to start
// There are currently no file events to process , Or the time event is set but the non blocking flag is not set
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// If the time event is set but the non blocking flag is not set
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// Get the latest time event
shortest = aeSearchNearestTimer(eventLoop);
// Get the earliest time event
if (shortest) {
long now_sec, now_ms;
// Get the current time
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
// The length of time required to wait for the time event to arrive
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
// If not then
if (ms > 0) {
// Save until tvp in
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
// If it's already there , Will tvp Is set to 0
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
// Failed to get the earliest time event , The linked list of time events is empty
} else {
// If the non blocking flag is set
if (flags & AE_DONT_WAIT) {
// take tvp Is set to 0, It won't block
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
// Block until the arrival of the first time event
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// Wait for an event to occur on the monitored file descriptor
// If tvp by NULL, Then block here , Or wait for tvp Set the blocking time , There will be time for the event
// Returned the number of ready file events
numevents = aeApiPoll(eventLoop, tvp);
// Traverse the ready file event table
for (j = 0; j < numevents; j++) {
// Get the address of the ready file event
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// Get the type of ready file event , File descriptor
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
// If a file readable event occurs
if (fe->mask & mask & AE_READABLE) {
// Set the read event ID And Call the read event method to handle the read event
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// If a file writable event occurs
if (fe->mask & mask & AE_WRITABLE) {
// The execution methods of read-write events are different , The write event is executed , Avoid repeating the same method
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++; // The number of events executed plus 1
}
}
/* Check time events */
// Execution time events
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
Redis When the server is not triggered by an event , If not set AE_DONT_WAIT identification , It will start to block and wait . But it won't wait , Because we still need to deal with time events , So in calling aeApiPoll Before listening , We will first get the latest arrival time from the time event table , Build one according to the time you need to wait struct timeval tv, *tvp Structural variables , This variable holds the maximum time for the server to block and wait for file events , Once the time arrives without triggering the file event aeApiPoll The function will stop blocking , And then call processTimeEvents Function handles time events .
If between the longest waiting time of blocking , File event triggered , The file event will be executed first , Post execution time event , Therefore, processing time events is usually a little later than the preset .
And execute file events rfileProc and wfileProc It also calls the callback function ,Redis The processing of file events is divided into several types , Used to deal with different network communication requirements :
acceptTcpHandler: be used foracceptclient Ofconnect.acceptUnixHandler: be used foracceptclient The localconnect.sendReplyToClient: Used to direct to client Send command reply .readQueryFromClient: Used to read in client Sent request .
Then let's look at the function to get the fastest arrival time event aeSearchNearestTimer Realization :
// Look for the first time event that is approaching
// This operation is useful to know how much time you can choose to set this event to sleep without delaying any event .
// If there is no time in the event linked list, it will be returned NULL.
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
// Time event header node address
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
// Traverse all time events
while(te) {
// Look for the first time event that is approaching , Save to nearest in
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
This function is to traverse the linked list of time events , Then find the minimum value .
Let's focus on the functions that execute time events processTimeEvents Implementation of function :
// Execution time events
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
// Here we try to find time chaos , The last time the event was processed was longer than the current time
// Reset the last time the event was processed
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// Set the time of the last time event processing to the current time
eventLoop->lastTime = now;
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1; // The maximum in the current time event table ID
// Traversing the time event list
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
// If the time event has been deleted
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
// Delete the node of the event from the event list
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
// Call the time event termination method to clear the event
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
// Make sure we don't deal with time events created by time events in this iteration . Please note that , This check is currently invalid : We always add new timers in the header node , But if we change the implementation details , Then the check may be useful again : We will keep it in the future defense
if (te->id > maxId) {
te = te->next;
continue;
}
// Get the current time
aeGetTime(&now_sec, &now_ms);
// Find the time event that has arrived
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// Call the time event processing method
retval = te->timeProc(eventLoop, id, te->clientData);
// The number of time events plus 1
processed++;
// If it is not a scheduled event , Then continue to set its arrival time
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
// If it is a fixed time , be retval by -1, Then delete its time event , Lazy deletion
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
// Update the predecessor node pointer and successor node pointer
prev = te;
te = te->next;
}
return processed; // Returns the number of execution events
}
If the time event does not exist , Then call finalizerProc The callback function pointed to , Delete the current time event . If there is , Just call timeProc The callback function pointed to handles time events . Redis Time events fall into two categories :
- Timing events : Let a program execute once after a specified time .
- Periodic events : Let a program execute every specified time .
If the current time event is periodic , Then the time period will be added to the arrival time of the periodic event . If it is a timed event , Then delete the time event .
Reference material :
《Redis Design and implementation 》
边栏推荐
- 简述Synchronized以及锁升级
- Interviewer: talk about log The difference between fatal and panic
- 带你初步了解多方安全计算(MPC)
- 【VSCODE】支持argparser/接受命令行参数
- 8 年产品经验,我总结了这些持续高效研发实践经验 · 研发篇
- IDEA集成SVN代码管理常用功能
- Redis源码与设计剖析 -- 18.Redis网络连接库分析
- What is an IP SSL certificate and how to apply for it?
- Go defer and recover simple notes
- window10系统下nvm的安装步骤以及使用方法
猜你喜欢

计算日期或日期格式化

我也是醉了,Eureka 延迟注册还有这个坑!

「数字安全」警惕 NFT的七大骗局

SVN客户端(TortoiseSVN)安装及使用说明

绘制pdf表格 (一) 通过itext实现在pdf中绘制excel表格样式并且实现下载(支持中文字体)

How to fix the first row title when scrolling down in Excel table / WPS table?

PageHelper还能结合Lambda表达式实现简洁的分页封装

绘制pdf表格 (二) 通过itext实现在pdf中绘制excel表格样式设置中文字体、水印、logo、页眉、页码

带你初步了解多方安全计算(MPC)
P2P 之 UDP穿透NAT的原理与实现
随机推荐
WPF 实现用户头像选择器
期货开户哪家最好最安全
HCIP第一天实验
电子产品“使用”和“放置”哪个寿命更长??
Brief introduction of bubble sort and quick sort
走马卒
无聊发文吐槽工作生活
With 8 years of product experience, I have summarized these practical experience of continuous and efficient research and development
什么是 IP SSL 证书,如何申请?
EDI docking commercehub orderstream
Redistemplate solves the problem of oversold inventory in the seckill system with high speed - redis transaction + optimistic lock mechanism
[cadence Allegro PCB design] error: possible pin type conflict gnd/vcc power connected to output
【硬件工程师】DC-DC隔离式开关电源模块为什么会用到变压器?
交友活动记录
Does PgSQL have a useful graphical management tool?
Go language context control function execution timeout return
【VSCODE】支持argparser/接受命令行参数
Three dimensional function display of gray image
MySQL数据库中去重与连接查询的方法
[Hardware Engineer] about signal level driving capability