提交 78d6a22d 编写于 作者: A antirez

Better system for additional commands replication.

The new code uses a more generic data structure to describe redis operations.
The new design allows for multiple alsoPropagate() calls within the scope of a
single command, that is useful in different contexts. For instance there
when there are multiple clients doing BRPOPLPUSH against the same list,
and a variadic LPUSH is performed against this list, the blocked clients
will both be served, and we should correctly replicate multiple LPUSH
commands after the replication of the current command.
上级 eeb34eff
......@@ -1161,6 +1161,43 @@ void resetCommandTableStats(void) {
}
}
/* ========================== Redis OP Array API ============================ */
void redisOpArrayInit(redisOpArray *oa) {
oa->ops = NULL;
oa->numops = 0;
}
int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
robj **argv, int argc, int target)
{
redisOp *op;
oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
op = oa->ops+oa->numops;
op->cmd = cmd;
op->dbid = dbid;
op->argv = argv;
op->argc = argc;
op->target = target;
oa->numops++;
return oa->numops;
}
void redisOpArrayFree(redisOpArray *oa) {
while(oa->numops) {
int j;
redisOp *op;
oa->numops--;
op = oa->ops+oa->numops;
for (j = 0; j < op->argc; j++)
decrRefCount(op->argv[j]);
zfree(op->argv);
}
zfree(oa->ops);
}
/* ====================== Commands lookup and execution ===================== */
struct redisCommand *lookupCommand(sds name) {
......@@ -1193,18 +1230,12 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
/* Used inside commands to propatate an additional command if needed. */
/* Used inside commands to schedule the propagation of additional commands
* after the current command is propagated to AOF / Replication. */
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int target)
{
propagatedItem *pi = &server.also_propagate;
redisAssert(pi->target == REDIS_PROPAGATE_NONE);
pi->cmd = cmd;
pi->dbid = dbid;
pi->argv = argv;
pi->argc = argc;
pi->target = target;
redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
}
/* Call() is the core of Redis execution of a command */
......@@ -1217,7 +1248,7 @@ void call(redisClient *c, int flags) {
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
/* Call the command. */
server.also_propagate.target = REDIS_PROPAGATE_NONE;
redisOpArrayInit(&server.also_propagate);
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
......@@ -1250,13 +1281,15 @@ void call(redisClient *c, int flags) {
}
/* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
* PUSH command. */
if (server.also_propagate.target != REDIS_PROPAGATE_NONE) {
if (server.also_propagate.numops) {
int j;
propagatedItem *pi = &server.also_propagate;
redisOp *rop;
propagate(pi->cmd, pi->dbid, pi->argv, pi->argc, pi->target);
for (j = 0; j < pi->argc; j++) decrRefCount(pi->argv[j]);
zfree(pi->argv);
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
}
redisOpArrayFree(&server.also_propagate);
}
server.stat_numcommands++;
}
......
......@@ -399,16 +399,29 @@ typedef struct clientBufferLimitsConfig {
time_t soft_limit_seconds;
} clientBufferLimitsConfig;
/* Currently only used to additionally propagate more commands to AOF/Replication
* after the propagation of the executed command.
* The structure contains everything needed to propagate a command:
* argv and argc, the ID of the database, pointer to the command table entry,
* and finally the target, that is an xor between REDIS_PROPAGATE_* flags. */
typedef struct propagatedItem {
/* The redisOp structure defines a Redis Operation, that is an instance of
* a command with an argument vector, database ID, propagation target
* (REDIS_PROPAGATE_*), and command pointer.
*
* Currently only used to additionally propagate more commands to AOF/Replication
* after the propagation of the executed command. */
typedef struct redisOp {
robj **argv;
int argc, dbid, target;
struct redisCommand *cmd;
} propagatedItem;
} redisOp;
/* Defines an array of Redis operations. There is an API to add to this
* structure in a easy way.
*
* redisOpArrayInit();
* redisOpArrayAppend();
* redisOpArrayFree();
*/
typedef struct redisOpArray {
redisOp *ops;
int numops;
} redisOpArray;
/*-----------------------------------------------------------------------------
* Redis cluster data structures
......@@ -624,7 +637,7 @@ struct redisServer {
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
/* Propagation of commands in AOF / replication */
propagatedItem also_propagate; /* Additional command to propagate. */
redisOpArray also_propagate; /* Additional command to propagate. */
/* Logging */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册