How to efficiently implement scheduled tasks in Redis

How to efficiently implement scheduled tasks in Redis

Redis uses a single thread combined with a non-blocking event polling mechanism to achieve efficient network IO and time event processing. In this article, we will deeply analyze the design and implementation of redis time events from the perspective of source code.

Detailed explanation of time events in redis

Definition of time events

Time events can be single expiration execution and destruction, or they can be scheduled tasks. For this, redis uniformly encapsulates time events as aeTimeEvent objects, uniquely identifies an event through id, and records the seconds and minutes of the task expiration execution in combination with when_sec and when_ms. The function that executes the time event is also executed by the function pointed to by the timeProc pointer. Let's take a redis scheduled task as an example, as shown below. The result records the time before the second and the time in milliseconds through when_sec and when_ms. Once this time is reached, the method serverCron pointed to by the timeProc function pointer will be executed. This function will periodically execute various tasks, which I will expand on later:

Correspondingly, we give the code description of the time event, that is, the aeTimeEvent structure in the header file ae.h. This is the encapsulation structure of the time event. It can be seen that in addition to the core fields mentioned above, it also has a next pointer for connecting to the next registered time event:

 //时间事件typedef struct aeTimeEvent { //时间事件的id全局递增long long id; /* time event identifier. */ long when_sec; /* seconds */ //时间到达的时间long when_ms; /* milliseconds */ //对应时间时间的处理器aeTimeProc *timeProc; //...... //连接下一个时间时间struct aeTimeEvent *next; } aeTimeEvent;

As mentioned above, redis time events are linked in the form of linked lists. Here we also give a unified time management object, namely the time poller aeEventLoop, which records the first time through timeEventHead and manages subsequent time uniformly with the next pointer of the time:

Correspondingly, we also give the definition of this period of code, that is, the definition of aeEventLoop in ae.h:

 typedef struct aeEventLoop { //...... //管理时间事件的列表aeTimeEvent *timeEventHead; //...... } aeEventLoop;

Registering time events

During the server initialization phase, redis will register a timed event that is triggered approximately every 1 millisecond. The main purpose of this event is to:

  • Update the redis global clock, which is used to obtain time from global variables.
  • Randomly extract samples from the redis memory database and delete expired key-value pairs.
  • If it is detected that the aof rewrite is completed, the disk flushing operation is performed.
  • If the current aof size is found to be too large, the fork child process will rewrite the aof.
  • .......

Corresponding to the source code segment of the time event registration we gave, that is, aeCreateTimeEvent in the method initServer called when redis is initialized, you can see that it encapsulates the scheduled task as a time event timeEvent and sets the time interval to once every 1 millisecond:

 void initServer(void) { //...... /* Create the serverCron() time event, that's our main way to process * background operations. */ //创建时间事件注册到eventLoop->timeEventHead中if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); } //...... }

Polling for processing time events

After redis processes all user requests, it will call the time processing function processTimeEvents to poll and process the ready time events, thereby ensuring that the time events are executed as punctually as possible. If the event time is not a scheduled task, it will be deleted directly after execution, otherwise it will set the next execution time. After all these steps are completed, the number of time events processed this time is returned:

We give the entry point aeMain for processing the time loop. We can see that this function is the core function of redis. It will call aeProcessEvents in a loop to process various events:

 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); //处理各种事件aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }

For example, aeProcessEvents can show that after executing all user requests, the function calls the processTimeEvents method to obtain and execute the ready time events:

 int aeProcessEvents(aeEventLoop *eventLoop, int flags) { //...... //处理就绪的客户端事件numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; /* note the fe->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } //上述核心网络IO事件完成后处理时间事件if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }

Finally, we can see the core code segment for processing time events. It polls the ready time events from timeEventHead, compares whether the current time is greater than or equal to the expiration time, and if so, executes the current time event. Then, it determines whether the event is a timed event. If so, it updates the next execution time, otherwise, it deletes it, and finally accumulates the time of this processing:

 static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); //...... if (now < eventLoop->lastTime) { //从时间事件头开始te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } eventLoop->lastTime = now; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; //循环处理到期的时间事件while(te) { long now_sec, now_ms; long long id; if (te->id > maxId) { te = te->next; continue; } aeGetTime(&now_sec, &now_ms); //如果现在的事件大于到达时间if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; //调用时间时间函数处理该事件retval = te->timeProc(eventLoop, id, te->clientData); //更新处理数processed++; //..... if (retval != AE_NOMORE) {//如果事件类型不是AE_NOMORE则说明是定时事件更新周期,反之删除aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }

Redis optimization for time event implementation

Because some time events require regular execution, redis has made the following two optimizations to ensure the real-time execution of time events:

  • For time-consuming events, such as AOF rewriting, they are completed asynchronously through fork subprocesses:
  • For the content returned to the client socket, if the length exceeds the preset value, the thread execution right will be actively relinquished to avoid time starvation.

Correspondingly, we give the first point in time for the core code segment of aof rewriting. You can see that serverCron internally determines that if there is no rdb and aof child process at present, and aof rewriting is required, the rewriteAppendOnlyFileBackground function is called to fork the child process for aof rewriting:

 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { //...... /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ //aof_rewrite_scheduled设置为1,且没有其他持久化子进程则进行aof重写,通过异步避免耗时if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } //...... } //fork子进程进行aof重写int rewriteAppendOnlyFileBackground(void) { //...... if ((childpid = fork()) == 0) {//fork子进程进行aof重写char tmpfile[256]; /* Child */ closeListeningSockets(0); redisSetProcTitle("redis-aof-rewrite"); //生成一个tmp文件snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重写aof size_t private_dirty = zmalloc_get_private_dirty(); //...... exitFromChild(0); } else { exitFromChild(1); } } else { //...... } return REDIS_OK; /* unreached */ }

The handler sendReplyToClient that replies to the client also has a section inside. If the write number totwritten is greater than REDIS_MAX_WRITE_PER_EVENT (the macro is defined as 64M), the write is directly terminated, and the break exit is waited until the next loop processing to avoid starvation of other time events due to this processing, which leads to delayed event execution:

 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { //...... while(c->bufpos > 0 || listLength(c->reply)) { //...... //对于文件事件数据写入超长会让出执行权让时间事件能够尽可能的执行server.stat_net_output_bytes += totwritten; if (totwritten > REDIS_MAX_WRITE_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break; } //...... }

<<: 

>>:  What problems do HTTP/1, HTTP/2, and HTTP/3 solve?

Blog    

Recommend

5G is coming, do I need to change my SIM card?

2019 is the first year of 5G. With the issuance o...

Japanese media: China may be the first to master 5G international standards

China, which has no say in almost all modern scie...

Three tips to get a good start on AIOps implementation

[[386134]] Faced with fierce market competition, ...

iONcloud: 15% off cloud hosting in San Jose/Los Angeles, Linux/Windows available

iONcloud is a cloud hosting platform opened by Kr...

...

3 Types of Wireless Network Site Surveys and How to Perform Them

Designing and maintaining a network is complex, b...

Correctly understand the wrong ideas in RS-485 wiring process

[[185474]] RS485 bus is widely used in video surv...