提交 8fadfe52 编写于 作者: A antirez

Module: API to block clients with threading support.

Just a draft to align the main ideas, never executed code. Compiles.
上级 a5998d1f
......@@ -136,6 +136,8 @@ void unblockClient(client *c) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
......@@ -153,12 +155,15 @@ void unblockClient(client *c) {
}
/* This function gets called when a blocked client timed out in order to
* send it a reply of some kind. */
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
} else if (c->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
......
......@@ -105,6 +105,7 @@ struct RedisModuleCtx {
int flags; /* REDISMODULE_CTX_... flags. */
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
void *blocked_privdata; /* Privdata set when unblocking a clinet. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos;
......@@ -114,10 +115,12 @@ struct RedisModuleCtx {
};
typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, 0, NULL}
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
/* This represents a Redis key opened with RM_OpenKey(). */
struct RedisModuleKey {
......@@ -183,6 +186,23 @@ typedef struct RedisModuleCallReply {
} val;
} RedisModuleCallReply;
/* Structure representing a blocked client. We get a pointer to such
* an object when blocking from modules. */
typedef struct RedisModuleBlockedClient {
client *client; /* Pointer to the blocked client. or NULL if the client
was destroyed during the life of this object. */
RedisModule *module; /* Module blocking the client. */
RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
void (*free_privdata)(void *); /* privdata cleanup callback. */
void *privdata; /* Module private data that may be used by the reply
or timeout callback. It is set via the
RedisModule_UnblockClient() API. */
} RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
static list *moduleUnblockedClients;
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
......@@ -403,26 +423,36 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
}
}
/* This Redis command binds the normal Redis command invocation with commands
* exported by modules. */
void RedisModuleCommandDispatcher(client *c) {
RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
/* Helper function for when a command callback is called, in order to handle
* details needed to correctly replicate commands. */
void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
client *c = ctx->client;
ctx.module = cp->module;
ctx.client = c;
cp->func(&ctx,(void**)c->argv,c->argc);
/* We don't want any automatic propagation here since in modules we handle
* replication / AOF propagation in explicit ways. */
preventCommandPropagation(c);
/* Handle the replication of the final EXEC, since whatever a command
* emits is always wrappered around MULTI/EXEC. */
if (ctx.flags & REDISMODULE_CTX_MULTI_EMITTED) {
if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
robj *propargv[1];
propargv[0] = createStringObject("EXEC",4);
alsoPropagate(server.execCommand,c->db->id,propargv,1,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(propargv[0]);
}
}
/* This Redis command binds the normal Redis command invocation with commands
* exported by modules. */
void RedisModuleCommandDispatcher(client *c) {
RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = cp->module;
ctx.client = c;
cp->func(&ctx,(void**)c->argv,c->argc);
moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
......@@ -3034,6 +3064,130 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ...
va_end(ap);
}
/* --------------------------------------------------------------------------
* Blocking clients from modules
* -------------------------------------------------------------------------- */
/* This is called from blocked.c in order to unblock a client: may be called
* for multiple reasons while the client is in the middle of being blocked
* because the client is terminated, but is also called for cleanup when a
* client is unblocked in a clean way after replaying.
*
* What we do here is just to set the client to NULL in the redis module
* blocked client handle. This way if the client is terminated while there
* is a pending threaded operation involving the blocked client, we'll know
* that the client no longer exists and no reply callback should be called.
*
* The structure RedisModuleBlockedClient will be always deallocated when
* running the list of clients blocked by a module that need to be unblocked. */
void unblockClientFromModule(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
bc->client = NULL;
}
int RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms) {
client *c = ctx->client;
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
bc->client = c;
bc->module = ctx->module;
bc->reply_callback = reply_callback;
bc->timeout_callback = timeout_callback;
bc->free_privdata = free_privdata;
bc->privdata = NULL;
c->bpop.timeout = timeout_ms;
blockClient(c,BLOCKED_MODULE);
return REDISMODULE_OK;
}
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so
* the caller of this function can pass any value that is needed in order to
* actually reply to the client.
*
* A common usage for 'privdata' is a thread that computes something that
* needs to be passed to the client, included but not limited some slow
* to compute reply or some reply obtained via networking.
*
* Note: this function can be called from threads spawned by the module. */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc);
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK;
}
/* This function will check the moduleUnblockedClients queue in order to
* call the reply callback and really unblock the client.
*
* Clients end into this list because of calls to RM_UnblockClient(),
* however it is possible that while the module was doing work for the
* blocked client, it was terminated by Redis (for timeout or other reasons).
* When this happens the RedisModuleBlockedClient structure in the queue
* will have the 'client' field set to NULL. */
void moduleHandleBlockedClients(void) {
listNode *ln;
RedisModuleBlockedClient *bc;
pthread_mutex_lock(&moduleUnblockedClientsMutex);
while (listLength(moduleUnblockedClients)) {
ln = listFirst(moduleUnblockedClients);
bc = ln->value;
client *c = bc->client;
listDelNode(server.unblocked_clients,ln);
if (c != NULL) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata;
ctx.module = bc->module;
ctx.client = bc->client;
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
moduleHandlePropagationAfterCommandCallback(&ctx);
moduleFreeContext(&ctx);
}
if (bc->privdata && bc->free_privdata)
bc->free_privdata(bc->privdata);
zfree(bc);
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
}
/* Called when our client timed out. After this function unblockClient()
* is called, and it will invalidate the blocked client. So this function
* does not need to do any cleanup. Eventually the module will call the
* API to unblock the client and the memory will be released. */
void moduleBlockedClientTimedOut(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_TIMEOUT;
ctx.module = bc->module;
ctx.client = bc->client;
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
}
/* Return non-zero if a module command was called in order to fill the
* reply for a blocked client. */
int RM_IsBlockedReplyRequest(RedisModuleCtx *ctx) {
return (ctx->flags & REDISMODULE_CTX_BLOCKED_REPLY) != 0;
}
/* Return non-zero if a module command was called in order to fill the
* reply for a blocked client that timed out. */
int RM_IsBlockedTimeoutRequest(RedisModuleCtx *ctx) {
return (ctx->flags & REDISMODULE_CTX_BLOCKED_TIMEOUT) != 0;
}
/* Get the privata data set by RedisModule_UnblockClient() */
void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata;
}
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
......@@ -3070,6 +3224,8 @@ int moduleRegisterApi(const char *funcname, void *funcptr) {
void moduleRegisterCoreAPI(void);
void moduleInitModulesSystem(void) {
moduleUnblockedClients = listCreate();
server.loadmodule_queue = listCreate();
modules = dictCreate(&modulesDictType,NULL);
moduleRegisterCoreAPI();
......
......@@ -1195,6 +1195,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients();
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
processUnblockedClients();
......
......@@ -245,6 +245,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_NONE 0 /* Not blocked, no CLIENT_BLOCKED flag set. */
#define BLOCKED_LIST 1 /* BLPOP & co. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
/* Client request types */
#define PROTO_REQ_INLINE 1
......@@ -619,6 +620,11 @@ typedef struct blockingState {
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
/* BLOCKED_MODULE */
void *module_blocked_handle; /* RedisModuleBlockedClient structure.
which is opaque for the Redis core, only
handled in module.c. */
} blockingState;
/* The following structure represents a node in the server.ready_keys list,
......@@ -1226,6 +1232,9 @@ int *moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc,
moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid);
void moduleFreeContext(struct RedisModuleCtx *ctx);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c);
/* Utils */
long long ustime(void);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册