提交 a4ce7581 编写于 作者: P Pieter Noordhuis

Don't execute commands for clients when they are unblocked

上级 ecf94014
...@@ -646,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { ...@@ -646,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* for ready file descriptors. */ * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) { void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(eventLoop);
listNode *ln;
redisClient *c;
/* Awake clients that got all the swapped keys they requested */ /* Awake clients that got all the swapped keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) { if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li; listIter li;
listNode *ln;
listRewind(server.io_ready_clients,&li); listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
redisClient *c = ln->value; c = ln->value;
struct redisCommand *cmd; struct redisCommand *cmd;
/* Resume the client. */ /* Resume the client. */
...@@ -672,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) { ...@@ -672,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
processInputBuffer(c); processInputBuffer(c);
} }
} }
/* Try to process pending commands for clients that were just unblocked. */
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
redisAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
/* Process remaining data in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}
/* Write the AOF buffer on disk */ /* Write the AOF buffer on disk */
flushAppendOnlyFile(); flushAppendOnlyFile();
} }
...@@ -818,6 +832,7 @@ void initServer() { ...@@ -818,6 +832,7 @@ void initServer() {
server.clients = listCreate(); server.clients = listCreate();
server.slaves = listCreate(); server.slaves = listCreate();
server.monitors = listCreate(); server.monitors = listCreate();
server.unblocked_clients = listCreate();
createSharedObjects(); createSharedObjects();
server.el = aeCreateEventLoop(); server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum); server.db = zmalloc(sizeof(redisDb)*server.dbnum);
......
...@@ -435,6 +435,7 @@ struct redisServer { ...@@ -435,6 +435,7 @@ struct redisServer {
/* Blocked clients */ /* Blocked clients */
unsigned int bpop_blocked_clients; unsigned int bpop_blocked_clients;
unsigned int vm_blocked_clients; unsigned int vm_blocked_clients;
list *unblocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we /* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */ * have to take this state global, in order to pass it to sortCompare() */
int sort_desc; int sort_desc;
......
...@@ -759,12 +759,7 @@ void unblockClientWaitingData(redisClient *c) { ...@@ -759,12 +759,7 @@ void unblockClientWaitingData(redisClient *c) {
c->bpop.target = NULL; c->bpop.target = NULL;
c->flags &= (~REDIS_BLOCKED); c->flags &= (~REDIS_BLOCKED);
server.bpop_blocked_clients--; server.bpop_blocked_clients--;
/* We want to process data if there is some command waiting listAddNodeTail(server.unblocked_clients,c);
* in the input buffer. Note that this is safe even if
* unblockClientWaitingData() gets called from freeClient() because
* freeClient() will be smart enough to call this function
* *after* c->querybuf was set to NULL. */
if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
} }
/* This should be called from any function PUSHing into lists. /* This should be called from any function PUSHing into lists.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册