提交 6c001bfc 编写于 作者: A antirez

Unblocked clients API refactoring. See #4418.

上级 2b689ad6
......@@ -132,6 +132,31 @@ void processUnblockedClients(void) {
}
}
/* This function will schedule the client for reprocessing at a safe time.
*
* This is useful when a client was blocked for some reason (blocking opeation,
* CLIENT PAUSE, or whatever), because it may end with some accumulated query
* buffer that needs to be processed ASAP:
*
* 1. When a client is blocked, its readable handler is still active.
* 2. However in this case it only gets data into the query buffer, but the
* query is not parsed or executed once there is enough to proceed as
* usually (because the client is blocked... so we can't execute commands).
* 3. When the client is unblocked, without this function, the client would
* have to write some query in order for the readable handler to finally
* call processQueryBuffer*() on it.
* 4. With this function instead we can put the client in a queue that will
* process it for queries ready to be executed at a safe time.
*/
void queueClientForReprocessing(client *c) {
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(server.unblocked_clients,c);
}
}
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
......@@ -152,12 +177,7 @@ void unblockClient(client *c) {
server.blocked_clients_by_type[c->btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(server.unblocked_clients,c);
}
queueClientForReprocessing(c);
}
/* This function gets called when a blocked client timed out in order to
......
......@@ -2134,11 +2134,10 @@ int clientsArePaused(void) {
while ((ln = listNext(&li)) != NULL) {
c = listNodeValue(ln);
/* Don't touch slaves and blocked or unblocked clients.
* The latter pending requests be processed when unblocked. */
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED|CLIENT_UNBLOCKED)) continue;
c->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(server.unblocked_clients,c);
/* Don't touch slaves and blocked clients.
* The latter pending requests will be processed when unblocked. */
if (c->flags & (CLIENT_SLAVE|CLIENT_BLOCKED)) continue;
queueClientForReprocessing(c);
}
}
return server.clients_paused;
......
......@@ -1367,10 +1367,8 @@ void evalGenericCommand(client *c, int evalsha) {
* script timeout was detected. */
aeCreateFileEvent(server.el,c->fd,AE_READABLE,
readQueryFromClient,c);
if (server.masterhost && server.master && !(server.master->flags & CLIENT_UNBLOCKED)) {
server.master->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(server.unblocked_clients,server.master);
}
if (server.masterhost && server.master)
queueClientForReprocessing(server.master);
}
server.lua_caller = NULL;
......
......@@ -1884,6 +1884,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body);
void processUnblockedClients(void);
void blockClient(client *c, int btype);
void unblockClient(client *c);
void queueClientForReprocessing(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册