From f4823497074eaa3b567e38313d6e85c9cf9ece9f Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 24 Apr 2014 17:36:47 +0200 Subject: [PATCH] Process events with processEventsWhileBlocked() when blocked. When we are blocked and a few events a processed from time to time, it is smarter to call the event handler a few times in order to handle the accept, read, write, close cycle of a client in a single pass, otherwise there is too much latency added for clients to receive a reply while the server is busy in some way (for example during the DB loading). --- src/aof.c | 2 +- src/networking.c | 23 +++++++++++++++++++++++ src/rdb.c | 2 +- src/redis.h | 1 + src/scripting.c | 3 +-- 5 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/aof.c b/src/aof.c index ad743bf2..567d7ffc 100644 --- a/src/aof.c +++ b/src/aof.c @@ -559,7 +559,7 @@ int loadAppendOnlyFile(char *filename) { /* Serve the clients from time to time */ if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); - aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); + processEventsWhileBlocked(); } if (fgets(buf,sizeof(buf),fp) == NULL) { diff --git a/src/networking.c b/src/networking.c index 6de7fc12..f77f2a71 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1541,3 +1541,26 @@ void flushSlavesOutputBuffers(void) { } } } + +/* This function is called by Redis in order to process a few events from + * time to time while blocked into some not interruptible operation. + * This allows to reply to clients with the -LOADING error while loading the + * data set at startup or after a full resynchronization with the master + * and so forth. + * + * It calls the event loop in order to process a few events. Specifically we + * try to call the event loop for times as long as we receive acknowledge that + * some event was processed, in order to go forward with the accept, read, + * write, close sequence needed to serve a client. + * + * The function returns the total number of events processed. */ +int processEventsWhileBlocked(void) { + int iterations = 4; /* See the function top-comment. */ + int count = 0; + while (iterations--) { + int events = aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); + if (!events) break; + count += events; + } + return count; +} diff --git a/src/rdb.c b/src/rdb.c index 8c9f8415..32808ad1 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1073,7 +1073,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER) replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); - aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); + processEventsWhileBlocked(); } } diff --git a/src/redis.h b/src/redis.h index 862d580b..87061cc8 100644 --- a/src/redis.h +++ b/src/redis.h @@ -940,6 +940,7 @@ int getClientLimitClassByName(char *name); char *getClientLimitClassName(int class); void flushSlavesOutputBuffers(void); void disconnectSlaves(void); +int processEventsWhileBlocked(void); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) diff --git a/src/scripting.c b/src/scripting.c index 1298327e..327d8122 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -510,8 +510,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) { * here when the EVAL command will return. */ aeDeleteFileEvent(server.el, server.lua_caller->fd, AE_READABLE); } - if (server.lua_timedout) - aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); + if (server.lua_timedout) processEventsWhileBlocked(); if (server.lua_kill) { redisLog(REDIS_WARNING,"Lua script killed by user with SCRIPT KILL."); lua_pushstring(lua,"Script killed by user with SCRIPT KILL..."); -- GitLab