提交 b699c8ea 编写于 作者: A antirez

Diskless replication: EOF:<mark> streaming support slave side.

上级 1472c682
......@@ -818,6 +818,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
/* Static vars used to hold the EOF mark, and the last bytes received
* form the server: when they match, we reached the end of the transfer. */
static char eofmark[REDIS_RUN_ID_SIZE];
static char lastbytes[REDIS_RUN_ID_SIZE];
static int usemark = 0;
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
......@@ -843,16 +849,41 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
/* There are two possible forms for the bulk payload. One is the
* usual $<count> bulk format. The other is used for diskless transfers
* when the master does not know beforehand the size of the file to
* transfer. In the latter case, the following format is used:
*
* $EOF:<40 bytes delimiter>
*
* At the end of the file the announced delimiter is transmitted. The
* delimiter is long and random enough that the probability of a
* collision with the actual file content can be ignored. */
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
memset(lastbytes,0,REDIS_RUN_ID_SIZE);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving streamed RDB from master");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
}
return;
}
/* Read bulk data */
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
if (usemark) {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
} else {
readlen = sizeof(buf);
}
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
......@@ -860,6 +891,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
replicationAbortSyncTransfer();
return;
}
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter. */
if (nread >= REDIS_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
} else {
int rem = REDIS_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
}
server.repl_transfer_lastio = server.unixtime;
if (write(server.repl_transfer_fd,buf,nread) != nread) {
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
......@@ -881,7 +929,12 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
}
/* Check if the transfer is now complete */
if (server.repl_transfer_read == server.repl_transfer_size) {
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
if (eof_reached) {
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册