From 82bfae5b70aa840f14d15660b6d0212c0f761598 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 15 Oct 2014 15:31:19 +0200 Subject: [PATCH] Diskless replication: handle putting the slave online. --- src/replication.c | 88 ++++++++++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 31 deletions(-) diff --git a/src/replication.c b/src/replication.c index d39da011..49af06a5 100644 --- a/src/replication.c +++ b/src/replication.c @@ -605,6 +605,29 @@ void replconfCommand(redisClient *c) { addReply(c,shared.ok); } +/* This function puts a slave in the online state, and should be called just + * after a slave received the RDB file for the initial synchronization, and + * we are finally ready to send the incremental stream of commands. + * + * It does a few things: + * + * 1) Put the slave in ONLINE state. + * 2) Make sure the writable event is re-installed, since calling the SYNC + * command disables it, so that we can accumulate output buffer without + * sending it to the slave. + * 3) Update the count of good slaves. */ +void putSlaveOnline(redisClient *slave) { + slave->replstate = REDIS_REPL_ONLINE; + slave->repl_ack_time = server.unixtime; + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, + sendReplyToClient, slave) == AE_ERR) { + redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); + freeClient(slave); + return; + } + refreshGoodSlavesCount(); +} + void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *slave = privdata; REDIS_NOTUSED(el); @@ -655,16 +678,8 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { close(slave->repldbfd); slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - slave->replstate = REDIS_REPL_ONLINE; - slave->repl_ack_time = server.unixtime; - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, - sendReplyToClient, slave) == AE_ERR) { - redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno)); - freeClient(slave); - return; - } - refreshGoodSlavesCount(); - redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); + putSlaveOnline(slave); + redisLog(REDIS_NOTICE,"Synchronization with slave succeeded (disk)"); } } @@ -694,27 +709,38 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; - if (bgsaveerr != REDIS_OK) { - freeClient(slave); - redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); - continue; - } - if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || - redis_fstat(slave->repldbfd,&buf) == -1) { - freeClient(slave); - redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); - continue; - } - slave->repldboff = 0; - slave->repldbsize = buf.st_size; - slave->replstate = REDIS_REPL_SEND_BULK; - slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", - (unsigned long long) slave->repldbsize); - - aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { - freeClient(slave); - continue; + /* If this was an RDB on disk save, we have to prepare to send + * the RDB from disk to the slave socket. Otherwise if this was + * already an RDB -> Slaves socket transfer, used in the case of + * diskless replication, our work is trivial, we can just put + * the slave online. */ + if (type == REDIS_RDB_CHILD_TYPE_SOCKET) { + putSlaveOnline(slave); + redisLog(REDIS_NOTICE, + "Synchronization with slave succeeded (socket)"); + } else { + if (bgsaveerr != REDIS_OK) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); + continue; + } + if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 || + redis_fstat(slave->repldbfd,&buf) == -1) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + continue; + } + slave->repldboff = 0; + slave->repldbsize = buf.st_size; + slave->replstate = REDIS_REPL_SEND_BULK; + slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", + (unsigned long long) slave->repldbsize); + + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { + freeClient(slave); + continue; + } } } } -- GitLab