diff --git a/src/replication.c b/src/replication.c index 348428c56dad68736ad734267f489241210033ff..a028359abbf4593f17ea115233403280e76424a2 100644 --- a/src/replication.c +++ b/src/replication.c @@ -424,6 +424,7 @@ int masterTryPartialResynchronization(redisClient *c) { * 3) Send the backlog data (from the offset to the end) to the slave. */ c->flags |= REDIS_SLAVE; c->replstate = REDIS_REPL_ONLINE; + c->repl_ack_time = server.unixtime; listAddNodeTail(server.slaves,c); /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is @@ -655,6 +656,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { slave->repldbfd = -1; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); slave->replstate = REDIS_REPL_ONLINE; + slave->repl_ack_time = server.unixtime; if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { freeClient(slave); @@ -1457,6 +1459,31 @@ void replicationCron(void) { } } + /* Disconnect timedout slaves. */ + if (listLength(server.slaves)) { + listIter li; + listNode *ln; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = ln->value; + + if (slave->replstate != REDIS_REPL_ONLINE) continue; + if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) + { + char ip[32]; + int port; + + if (anetPeerToString(slave->fd,ip,&port) != -1) { + redisLog(REDIS_WARNING, + "Disconnecting timedout slave: %s:%d", + ip, slave->slave_listening_port); + } + freeClient(slave); + } + } + } + /* If we have no attached slaves and there is a replication backlog * using memory, free it after some (configured) time. */ if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&