diff --git a/src/rdb.c b/src/rdb.c index c6a1ec6918ecae8079fafdf82800680d3c5d5b7d..b8e02b0219a3fd32d37b80e52ee5e8a71ab12a5e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1347,6 +1347,8 @@ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { redisLog(REDIS_WARNING, "Slave %llu correctly received the streamed RDB file.", slave->id); + /* Restore the socket as non-blocking. */ + anetNonBlock(NULL,slave->fd); } } } @@ -1408,6 +1410,10 @@ int rdbSaveToSlavesSockets(void) { clientids[numfds] = slave->id; fds[numfds++] = slave->fd; slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + /* Put the socket in non-blocking mode to simplify RDB transfer. + * We'll restore it when the children returns (since duped socket + * will share the O_NONBLOCK attribute with the parent). */ + anetBlock(NULL,slave->fd); } } diff --git a/src/replication.c b/src/replication.c index fa5ed87d70f9a2008caffe6ee75ef5bd7636d84f..ea8265e38698e7e18ff158c5f37802b5107da2af 100644 --- a/src/replication.c +++ b/src/replication.c @@ -888,6 +888,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } nread = read(fd,buf,readlen); + printf("NREAD %d (%d)\n", (int)nread, (int)readlen); if (nread <= 0) { redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); diff --git a/src/rio.c b/src/rio.c index 5153ed28e2015d57e3fce83a01a87f5bbcf993d1..3513e18899eff8f9d6a55493aa9ddeda9b3641c8 100644 --- a/src/rio.c +++ b/src/rio.c @@ -197,8 +197,17 @@ static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) { broken++; continue; } - retval = write(r->io.fdset.fds[j],p,count); - if (retval != count) { + + /* Make sure to write 'count' bytes to the socket regardless + * of short writes. */ + size_t nwritten = 0; + while(nwritten != count) { + retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); + if (retval <= 0) break; + nwritten += retval; + } + + if (nwritten != count) { /* Mark this FD as broken. */ r->io.fdset.state[j] = errno; if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;