提交 4fe7672d 编写于 作者: A antirez

PSYNC: don't use the client buffer to send +CONTINUE and +FULLRESYNC.

When we are preparing an handshake with the slave we can't touch the
connection buffer as it'll be used to accumulate differences between
the sent RDB file and what arrives next from clients.

So in short we can't use addReply() family functions.

However we just use write(2) because we know that the socket buffer is
empty, since a prerequisite for SYNC to work is that the static buffer
and the output list are empty, and in general it is not expected that a
client SYNCs after doing some heavy I/O with the master.

However a short write connection is explicitly handled to avoid
fragility (we simply close the connection and the slave will retry).
上级 8529dd21
...@@ -383,6 +383,8 @@ long long addReplyReplicationBacklog(redisClient *c, long long offset) { ...@@ -383,6 +383,8 @@ long long addReplyReplicationBacklog(redisClient *c, long long offset) {
int masterTryPartialResynchronization(redisClient *c) { int masterTryPartialResynchronization(redisClient *c) {
long long psync_offset, psync_len; long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr; char *master_runid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Is the runid of this master the same advertised by the wannabe slave /* Is the runid of this master the same advertised by the wannabe slave
* via PSYNC? If runid changed this master is a different instance and * via PSYNC? If runid changed this master is a different instance and
...@@ -418,7 +420,14 @@ int masterTryPartialResynchronization(redisClient *c) { ...@@ -418,7 +420,14 @@ int masterTryPartialResynchronization(redisClient *c) {
c->flags |= REDIS_SLAVE; c->flags |= REDIS_SLAVE;
c->replstate = REDIS_REPL_ONLINE; c->replstate = REDIS_REPL_ONLINE;
listAddNodeTail(server.slaves,c); listAddNodeTail(server.slaves,c);
addReplySds(c,sdsnew("+CONTINUE\r\n")); /* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
psync_len = addReplyReplicationBacklog(c,psync_offset); psync_len = addReplyReplicationBacklog(c,psync_offset);
redisLog(REDIS_NOTICE, redisLog(REDIS_NOTICE,
"Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset); "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
...@@ -434,10 +443,13 @@ need_full_resync: ...@@ -434,10 +443,13 @@ need_full_resync:
/* Add 1 to psync_offset if it the replication backlog does not exists /* Add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we'll increment the offset by one. */ * as when it will be created later we'll increment the offset by one. */
if (server.repl_backlog == NULL) psync_offset++; if (server.repl_backlog == NULL) psync_offset++;
addReplySds(c, /* Again, we can't use the connection buffers (see above). */
sdscatprintf(sdsempty(),"+FULLRESYNC %s %lld\r\n", buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid, server.runid,psync_offset);
psync_offset)); if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return REDIS_OK;
}
return REDIS_ERR; return REDIS_ERR;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册