提交 5f29e2e2 编写于 作者: Y Yossi Gottlieb 提交者: antirez

Initial command filter experiment.

上级 e1839ab3
......@@ -270,6 +270,28 @@ typedef struct RedisModuleDictIter {
raxIterator ri;
} RedisModuleDictIter;
/* Information about the command to be executed, as passed to and from a
* filter. */
typedef struct RedisModuleFilteredCommand {
RedisModuleString **argv;
int argc;
} RedisModuleFilteredCommand;
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd);
typedef struct RedisModuleCommandFilter {
/* The module that registered the filter */
RedisModule *module;
/* Filter callback function */
RedisModuleCommandFilterFunc callback;
/* Indicates a filter is active, avoid reentrancy */
int active;
} RedisModuleCommandFilter;
/* Registered filters */
static list *moduleCommandFilters;
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
......@@ -4772,6 +4794,56 @@ int moduleUnregisterUsedAPI(RedisModule *module) {
return count;
}
/* --------------------------------------------------------------------------
* Module Command Filter API
* -------------------------------------------------------------------------- */
/* Register a new command filter function. Filters get executed by Redis
* before processing an inbound command and can be used to manipulate the
* behavior of standard Redis commands. Filters must not attempt to
* perform Redis commands or operate on the dataset, and must restrict
* themselves to manipulation of the arguments.
*/
int RM_RegisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc callback) {
RedisModuleCommandFilter *filter = zmalloc(sizeof(*filter));
filter->module = ctx->module;
filter->callback = callback;
filter->active = 0;
listAddNodeTail(moduleCommandFilters, filter);
return REDISMODULE_OK;
}
void moduleCallCommandFilters(client *c) {
if (listLength(moduleCommandFilters) == 0) return;
listIter li;
listNode *ln;
listRewind(moduleCommandFilters,&li);
RedisModuleFilteredCommand cmd = {
.argv = c->argv,
.argc = c->argc
};
while((ln = listNext(&li))) {
RedisModuleCommandFilter *filter = ln->value;
if (filter->active) continue;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.module = filter->module;
filter->active = 1;
filter->callback(&ctx, &cmd);
filter->active = 0;
moduleFreeContext(&ctx);
}
c->argv = cmd.argv;
c->argc = cmd.argc;
}
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
......@@ -4817,6 +4889,9 @@ void moduleInitModulesSystem(void) {
moduleFreeContextReusedClient = createClient(-1);
moduleFreeContextReusedClient->flags |= CLIENT_MODULE;
/* Set up filter list */
moduleCommandFilters = listCreate();
moduleRegisterCoreAPI();
if (pipe(server.module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
......@@ -5215,4 +5290,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(DictCompare);
REGISTER_API(ExportSharedAPI);
REGISTER_API(GetSharedAPI);
REGISTER_API(RegisterCommandFilter);
}
......@@ -13,7 +13,7 @@ endif
.SUFFIXES: .c .so .xo .o
all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so
all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so hellofilter.so
.c.xo:
$(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
......@@ -46,6 +46,10 @@ hellotimer.so: hellotimer.xo
hellodict.xo: ../redismodule.h
hellodict.so: hellodict.xo
hellofilter.xo: ../redismodule.h
hellofilter.so: hellofilter.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
testmodule.xo: ../redismodule.h
......
#define REDISMODULE_EXPERIMENTAL_API
#include "../redismodule.h"
static RedisModuleString *log_key_name;
static const char log_command_name[] = "hellofilter.log";
int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
RedisModuleString *s = RedisModule_CreateStringFromString(ctx, argv[0]);
int i;
for (i = 1; i < argc; i++) {
size_t arglen;
const char *arg = RedisModule_StringPtrLen(argv[i], &arglen);
RedisModule_StringAppendBuffer(ctx, s, " ", 1);
RedisModule_StringAppendBuffer(ctx, s, arg, arglen);
}
RedisModuleKey *log = RedisModule_OpenKey(ctx, log_key_name, REDISMODULE_WRITE|REDISMODULE_READ);
RedisModule_ListPush(log, REDISMODULE_LIST_HEAD, s);
RedisModule_CloseKey(log);
RedisModule_FreeString(ctx, s);
size_t cmdlen;
const char *cmdname = RedisModule_StringPtrLen(argv[1], &cmdlen);
RedisModuleCallReply *reply = RedisModule_Call(ctx, cmdname, "v", &argv[2], argc - 2);
if (reply) {
RedisModule_ReplyWithCallReply(ctx, reply);
RedisModule_FreeCallReply(reply);
} else {
RedisModule_ReplyWithSimpleString(ctx, "Unknown command or invalid arguments");
}
return REDISMODULE_OK;
}
void HelloFilter_CommandFilter(RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd)
{
cmd->argv = RedisModule_Realloc(cmd->argv, (cmd->argc+1)*sizeof(RedisModuleString *));
int i;
for (i = cmd->argc; i > 0; i--) {
cmd->argv[i] = cmd->argv[i-1];
}
cmd->argv[0] = RedisModule_CreateString(ctx, log_command_name, sizeof(log_command_name)-1);
cmd->argc++;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (argc != 1) {
RedisModule_Log(ctx, "warning", "Log key name not specified");
return REDISMODULE_ERR;
}
log_key_name = RedisModule_CreateStringFromString(ctx, argv[0]);
if (RedisModule_CreateCommand(ctx,log_command_name,
HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter)
== REDISMODULE_ERR) return REDISMODULE_ERR;
return REDISMODULE_OK;
}
......@@ -163,6 +163,12 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef struct RedisModuleFilteredCommand {
RedisModuleString **argv;
int argc;
} RedisModuleFilteredCommand;
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCtx *ctx, RedisModuleFilteredCommand *cmd);
#define REDISMODULE_TYPE_METHOD_VERSION 1
typedef struct RedisModuleTypeMethods {
uint64_t version;
......@@ -338,6 +344,7 @@ void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedC
void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags);
int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func);
void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname);
int REDISMODULE_API_FUNC(RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb);
#endif
/* This is included inline inside each Redis module. */
......@@ -501,6 +508,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(SetClusterFlags);
REDISMODULE_GET_API(ExportSharedAPI);
REDISMODULE_GET_API(GetSharedAPI);
REDISMODULE_GET_API(RegisterCommandFilter);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;
......
......@@ -2541,6 +2541,8 @@ void call(client *c, int flags) {
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
moduleCallCommandFilters(c);
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
......
......@@ -1410,7 +1410,7 @@ size_t moduleCount(void);
void moduleAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleCallCommandFilters(client *c);
/* Utils */
long long ustime(void);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册