From a4ce7581553b1f4e29a7ed2141add788e56142c5 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:39:39 +0100 Subject: [PATCH] Don't execute commands for clients when they are unblocked --- src/redis.c | 19 +++++++++++++++++-- src/redis.h | 1 + src/t_list.c | 7 +------ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/redis.c b/src/redis.c index 8a5f9632..a1653c36 100644 --- a/src/redis.c +++ b/src/redis.c @@ -646,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + listNode *ln; + redisClient *c; /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; - listNode *ln; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { - redisClient *c = ln->value; + c = ln->value; struct redisCommand *cmd; /* Resume the client. */ @@ -672,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + + /* Try to process pending commands for clients that were just unblocked. */ + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + redisAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + + /* Process remaining data in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) + processInputBuffer(c); + } + /* Write the AOF buffer on disk */ flushAppendOnlyFile(); } @@ -818,6 +832,7 @@ void initServer() { server.clients = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); + server.unblocked_clients = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); diff --git a/src/redis.h b/src/redis.h index 27cb8259..3639f062 100644 --- a/src/redis.h +++ b/src/redis.h @@ -435,6 +435,7 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; unsigned int vm_blocked_clients; + list *unblocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; diff --git a/src/t_list.c b/src/t_list.c index 867e258a..7dc3f139 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -759,12 +759,7 @@ void unblockClientWaitingData(redisClient *c) { c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.bpop_blocked_clients--; - /* We want to process data if there is some command waiting - * in the input buffer. Note that this is safe even if - * unblockClientWaitingData() gets called from freeClient() because - * freeClient() will be smart enough to call this function - * *after* c->querybuf was set to NULL. */ - if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); + listAddNodeTail(server.unblocked_clients,c); } /* This should be called from any function PUSHing into lists. -- GitLab