提交 c5618e7f 编写于 作者: A antirez

WAIT command: synchronous replication for Redis.

上级 c2f30554
...@@ -130,6 +130,8 @@ void processUnblockedClients(void) { ...@@ -130,6 +130,8 @@ void processUnblockedClients(void) {
void unblockClient(redisClient *c) { void unblockClient(redisClient *c) {
if (c->btype == REDIS_BLOCKED_LIST) { if (c->btype == REDIS_BLOCKED_LIST) {
unblockClientWaitingData(c); unblockClientWaitingData(c);
} else if (c->btype == REDIS_BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else { } else {
redisPanic("Unknown btype in unblockClient()."); redisPanic("Unknown btype in unblockClient().");
} }
...@@ -147,6 +149,8 @@ void unblockClient(redisClient *c) { ...@@ -147,6 +149,8 @@ void unblockClient(redisClient *c) {
void replyToBlockedClientTimedOut(redisClient *c) { void replyToBlockedClientTimedOut(redisClient *c) {
if (c->btype == REDIS_BLOCKED_LIST) { if (c->btype == REDIS_BLOCKED_LIST) {
addReply(c,shared.nullmultibulk); addReply(c,shared.nullmultibulk);
} else if (c->btype == REDIS_BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
} else { } else {
redisPanic("Unknown btype in replyToBlockedClientTimedOut()."); redisPanic("Unknown btype in replyToBlockedClientTimedOut().");
} }
......
...@@ -114,6 +114,7 @@ redisClient *createClient(int fd) { ...@@ -114,6 +114,7 @@ redisClient *createClient(int fd) {
c->bpop.target = NULL; c->bpop.target = NULL;
c->bpop.numreplicas = 0; c->bpop.numreplicas = 0;
c->bpop.reploffset = 0; c->bpop.reploffset = 0;
c->woff = 0;
c->watched_keys = listCreate(); c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate(); c->pubsub_patterns = listCreate();
......
...@@ -263,7 +263,8 @@ struct redisCommand redisCommandTable[] = { ...@@ -263,7 +263,8 @@ struct redisCommand redisCommandTable[] = {
{"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}, {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
{"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}, {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0},
{"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
{"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0} {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
{"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0}
}; };
/*============================ Utility functions ============================ */ /*============================ Utility functions ============================ */
...@@ -1200,8 +1201,29 @@ void beforeSleep(struct aeEventLoop *eventLoop) { ...@@ -1200,8 +1201,29 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (server.active_expire_enabled && server.masterhost == NULL) if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. */
if (server.get_ack_from_slaves) {
robj *argv[3];
argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
server.get_ack_from_slaves = 0;
}
/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* Try to process pending commands for clients that were just unblocked. */ /* Try to process pending commands for clients that were just unblocked. */
processUnblockedClients(); if (listLength(server.unblocked_clients))
processUnblockedClients();
/* Write the AOF buffer on disk */ /* Write the AOF buffer on disk */
flushAppendOnlyFile(0); flushAppendOnlyFile(0);
...@@ -1557,6 +1579,8 @@ void initServer() { ...@@ -1557,6 +1579,8 @@ void initServer() {
server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate(); server.unblocked_clients = listCreate();
server.ready_keys = listCreate(); server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
createSharedObjects(); createSharedObjects();
adjustOpenFilesLimit(); adjustOpenFilesLimit();
...@@ -2079,6 +2103,7 @@ int processCommand(redisClient *c) { ...@@ -2079,6 +2103,7 @@ int processCommand(redisClient *c) {
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
call(c,REDIS_CALL_FULL); call(c,REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys)) if (listLength(server.ready_keys))
handleClientsBlockedOnLists(); handleClientsBlockedOnLists();
} }
......
...@@ -495,7 +495,8 @@ typedef struct redisClient { ...@@ -495,7 +495,8 @@ typedef struct redisClient {
int slave_listening_port; /* As configured with: SLAVECONF listening-port */ int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */ multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if REDIS_BLOCKED. */ int btype; /* Type of blocking op if REDIS_BLOCKED. */
blockingState bpop; /* blocking state */ blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
...@@ -750,6 +751,9 @@ struct redisServer { ...@@ -750,6 +751,9 @@ struct redisServer {
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
int repl_scriptcache_size; /* Max number of elements. */ int repl_scriptcache_size; /* Max number of elements. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
/* Limits */ /* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */ unsigned int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */ unsigned long long maxmemory; /* Max number of memory bytes to use */
...@@ -1063,6 +1067,9 @@ void replicationScriptCacheInit(void); ...@@ -1063,6 +1067,9 @@ void replicationScriptCacheInit(void);
void replicationScriptCacheFlush(void); void replicationScriptCacheFlush(void);
void replicationScriptCacheAdd(sds sha1); void replicationScriptCacheAdd(sds sha1);
int replicationScriptCacheExists(sds sha1); int replicationScriptCacheExists(sds sha1);
void processClientsWaitingReplicas(void);
void unblockClientWaitingReplicas(redisClient *c);
int replicationCountAcksByOffset(long long offset);
/* Generic persistence functions */ /* Generic persistence functions */
void startLoading(FILE *fp); void startLoading(FILE *fp);
...@@ -1398,6 +1405,7 @@ void timeCommand(redisClient *c); ...@@ -1398,6 +1405,7 @@ void timeCommand(redisClient *c);
void bitopCommand(redisClient *c); void bitopCommand(redisClient *c);
void bitcountCommand(redisClient *c); void bitcountCommand(redisClient *c);
void replconfCommand(redisClient *c); void replconfCommand(redisClient *c);
void waitCommand(redisClient *c);
#if defined(__GNUC__) #if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
......
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
void replicationDiscardCachedMaster(void); void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(int newfd); void replicationResurrectCachedMaster(int newfd);
void replicationSendAck(void);
/* ---------------------------------- MASTER -------------------------------- */ /* ---------------------------------- MASTER -------------------------------- */
...@@ -560,6 +561,11 @@ void replconfCommand(redisClient *c) { ...@@ -560,6 +561,11 @@ void replconfCommand(redisClient *c) {
c->repl_ack_time = server.unixtime; c->repl_ack_time = server.unixtime;
/* Note: this command does not reply anything! */ /* Note: this command does not reply anything! */
return; return;
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
/* REPLCONF GETACK is used in order to request an ACK ASAP
* to the slave. */
if (server.masterhost && server.master) replicationSendAck();
/* Note: this command does not reply anything! */
} else { } else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr); (char*)c->argv[j]->ptr);
...@@ -1495,7 +1501,136 @@ int replicationScriptCacheExists(sds sha1) { ...@@ -1495,7 +1501,136 @@ int replicationScriptCacheExists(sds sha1) {
return dictFind(server.repl_scriptcache_dict,sha1) != NULL; return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
} }
/* --------------------------- REPLICATION CRON ----------------------------- */ /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
* Redis synchronous replication design can be summarized in points:
*
* - Redis masters have a global replication offset, used by PSYNC.
* - Master increment the offset every time new commands are sent to slaves.
* - Slaves ping back masters with the offset processed so far.
*
* So synchronous replication adds a new WAIT command in the form:
*
* WAIT <num_replicas> <milliseconds_timeout>
*
* That returns the number of replicas that processed the query when
* we finally have at least num_replicas, or when the timeout was
* reached.
*
* The command is implemented in this way:
*
* - Every time a client processes a command, we remember the replication
* offset after sending that command to the slaves.
* - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
* The client is blocked at the same time (see blocked.c).
* - Once we receive enough ACKs for a given offset or when the timeout
* is reached, the WAIT command is unblocked and the reply sent to the
* client.
*/
/* This just set a flag so that we broadcast a REPLCONF GETACK command
* to all the slaves in the beforeSleep() function. Note that this way
* we "group" all the clients that want to wait for synchronouns replication
* in a given event loop iteration, and send a single GETACK for them all. */
void replicationRequestAckFromSlaves(void) {
server.get_ack_from_slaves = 1;
}
/* Return the number of slaves that already acknowledged the specified
* replication offset. */
int replicationCountAcksByOffset(long long offset) {
listIter li;
listNode *ln;
int count = 0;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
if (slave->replstate != REDIS_REPL_ONLINE) continue;
if (slave->repl_ack_off >= offset) count++;
}
return count;
}
/* WAIT for N replicas to acknowledge the processing of our latest
* write command (and all the previous commands). */
void waitCommand(redisClient *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
long long offset = c->woff;
/* Argument parsing. */
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK)
return;
if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
!= REDIS_OK) return;
/* First try without blocking at all. */
ackreplicas = replicationCountAcksByOffset(c->woff);
if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) {
addReplyLongLong(c,ackreplicas);
return;
}
/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
c->bpop.timeout = timeout;
c->bpop.reploffset = offset;
c->bpop.numreplicas = numreplicas;
listAddNodeTail(server.clients_waiting_acks,c);
blockClient(c,REDIS_BLOCKED_WAIT);
/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
replicationRequestAckFromSlaves();
}
/* This is called by unblockClient() to perform the blocking op type
* specific cleanup. We just remove the client from the list of clients
* waiting for replica acks. Never call it directly, call unblockClient()
* instead. */
void unblockClientWaitingReplicas(redisClient *c) {
listNode *ln = listSearchKey(server.clients_waiting_acks,c);
redisAssert(ln != NULL);
listDelNode(server.clients_waiting_acks,ln);
}
/* Check if there are clients blocked in WAIT that can be unblocked since
* we received enough ACKs from slaves. */
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
int last_numreplicas = 0;
listIter li;
listNode *ln;
listRewind(server.clients_waiting_acks,&li);
while((ln = listNext(&li))) {
redisClient *c = ln->value;
/* Every time we find a client that is satisfied for a given
* offset and number of replicas, we remember it so the next client
* may be unblocked without calling replicationCountAcksByOffset()
* if the requested offset / replicas were equal or less. */
if (last_offset && last_offset > c->bpop.reploffset &&
last_numreplicas > c->bpop.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
} else {
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
if (numreplicas >= c->bpop.numreplicas) {
last_offset = c->bpop.reploffset;
last_numreplicas = numreplicas;
unblockClient(c);
addReplyLongLong(c,numreplicas);
}
}
}
}
/* --------------------------- REPLICATION CRON ---------------------------- */
/* Replication cron funciton, called 1 time per second. */ /* Replication cron funciton, called 1 time per second. */
void replicationCron(void) { void replicationCron(void) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册