diff --git a/src/redis.c b/src/redis.c index 2eed47030b6e99ede8ebc31a106e7e46f7e1e792..78067d3106343484cedcd557fc8dfc3e9ff0637f 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1161,6 +1161,43 @@ void resetCommandTableStats(void) { } } +/* ========================== Redis OP Array API ============================ */ + +void redisOpArrayInit(redisOpArray *oa) { + oa->ops = NULL; + oa->numops = 0; +} + +int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid, + robj **argv, int argc, int target) +{ + redisOp *op; + + oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1)); + op = oa->ops+oa->numops; + op->cmd = cmd; + op->dbid = dbid; + op->argv = argv; + op->argc = argc; + op->target = target; + oa->numops++; + return oa->numops; +} + +void redisOpArrayFree(redisOpArray *oa) { + while(oa->numops) { + int j; + redisOp *op; + + oa->numops--; + op = oa->ops+oa->numops; + for (j = 0; j < op->argc; j++) + decrRefCount(op->argv[j]); + zfree(op->argv); + } + zfree(oa->ops); +} + /* ====================== Commands lookup and execution ===================== */ struct redisCommand *lookupCommand(sds name) { @@ -1193,18 +1230,12 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, replicationFeedSlaves(server.slaves,dbid,argv,argc); } -/* Used inside commands to propatate an additional command if needed. */ +/* Used inside commands to schedule the propagation of additional commands + * after the current command is propagated to AOF / Replication. */ void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target) { - propagatedItem *pi = &server.also_propagate; - - redisAssert(pi->target == REDIS_PROPAGATE_NONE); - pi->cmd = cmd; - pi->dbid = dbid; - pi->argv = argv; - pi->argc = argc; - pi->target = target; + redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target); } /* Call() is the core of Redis execution of a command */ @@ -1217,7 +1248,7 @@ void call(redisClient *c, int flags) { replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); /* Call the command. */ - server.also_propagate.target = REDIS_PROPAGATE_NONE; + redisOpArrayInit(&server.also_propagate); dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; @@ -1250,13 +1281,15 @@ void call(redisClient *c, int flags) { } /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional * PUSH command. */ - if (server.also_propagate.target != REDIS_PROPAGATE_NONE) { + if (server.also_propagate.numops) { int j; - propagatedItem *pi = &server.also_propagate; + redisOp *rop; - propagate(pi->cmd, pi->dbid, pi->argv, pi->argc, pi->target); - for (j = 0; j < pi->argc; j++) decrRefCount(pi->argv[j]); - zfree(pi->argv); + for (j = 0; j < server.also_propagate.numops; j++) { + rop = &server.also_propagate.ops[j]; + propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target); + } + redisOpArrayFree(&server.also_propagate); } server.stat_numcommands++; } diff --git a/src/redis.h b/src/redis.h index 09a746cc55b6cd924f5d1a39c0e1796c52a9813b..c26178702cdb04fa041cfb4b773e66fc7fa12478 100644 --- a/src/redis.h +++ b/src/redis.h @@ -399,16 +399,29 @@ typedef struct clientBufferLimitsConfig { time_t soft_limit_seconds; } clientBufferLimitsConfig; -/* Currently only used to additionally propagate more commands to AOF/Replication - * after the propagation of the executed command. - * The structure contains everything needed to propagate a command: - * argv and argc, the ID of the database, pointer to the command table entry, - * and finally the target, that is an xor between REDIS_PROPAGATE_* flags. */ -typedef struct propagatedItem { +/* The redisOp structure defines a Redis Operation, that is an instance of + * a command with an argument vector, database ID, propagation target + * (REDIS_PROPAGATE_*), and command pointer. + * + * Currently only used to additionally propagate more commands to AOF/Replication + * after the propagation of the executed command. */ +typedef struct redisOp { robj **argv; int argc, dbid, target; struct redisCommand *cmd; -} propagatedItem; +} redisOp; + +/* Defines an array of Redis operations. There is an API to add to this + * structure in a easy way. + * + * redisOpArrayInit(); + * redisOpArrayAppend(); + * redisOpArrayFree(); + */ +typedef struct redisOpArray { + redisOp *ops; + int numops; +} redisOpArray; /*----------------------------------------------------------------------------- * Redis cluster data structures @@ -624,7 +637,7 @@ struct redisServer { char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ /* Propagation of commands in AOF / replication */ - propagatedItem also_propagate; /* Additional command to propagate. */ + redisOpArray also_propagate; /* Additional command to propagate. */ /* Logging */ char *logfile; /* Path of log file */ int syslog_enabled; /* Is syslog enabled? */