提交 45d4e06e 编写于 作者: A antirez

replicationFeedSlave() reworked for correctness and speed.

The previous code using a static buffer as an optimization was lame:

1) Premature optimization, actually it was *slower* than naive code
   because resulted into the creation / destruction of the object
   encapsulating the output buffer.
2) The code was very hard to test, since it was needed to have specific
   tests for command lines exceeding the size of the static buffer.
3) As a result of "2" the code was bugged as the current tests were not
   able to stress specific corner cases.

It was replaced with easy to understand code that is safer and faster.
上级 3eab283b
......@@ -138,15 +138,11 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
#define FEEDSLAVE_BUF_SIZE (1024*64)
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, i, len;
char buf[FEEDSLAVE_BUF_SIZE], *b = buf;
int j, len;
char llstr[REDIS_LONGSTR_SIZE];
int buf_left = FEEDSLAVE_BUF_SIZE;
robj *o;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
......@@ -155,116 +151,66 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* We can't have slaves attached and no backlog. */
redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* What we do here is to try to write as much data as possible in a static
* buffer "buf" that is used to create an object that is later sent to all
* the slaves. This way we do the decoding only one time for most commands
* not containing big payloads. */
/* Create the SELECT command into the static buffer if needed. */
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
char *selectcmd;
size_t sclen;
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid]->ptr;
sclen = sdslen(selectcmd);
memcpy(b,selectcmd,sclen);
b += sclen;
buf_left -= sclen;
selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
sclen = snprintf(b,buf_left,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr);
b += sclen;
buf_left -= sclen;
selectcmd = createObject(REDIS_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
}
server.slaveseldb = dictid;
/* Add the multi bulk reply size to the static buffer, that is, the number
* of arguments of the command to send to every slave. */
b[0] = '*';
len = ll2string(b+1,REDIS_LONGSTR_SIZE,argc);
b += len+1;
buf_left -= len+1;
b[0] = '\r';
b[1] = '\n';
b += 2;
buf_left -= 2;
/* Try to use the static buffer for as much arguments is possible. */
for (j = 0; j < argc; j++) {
int objlen;
char *objptr;
/* Add the SELECT command into the backlog. */
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
if (argv[j]->encoding != REDIS_ENCODING_RAW &&
argv[j]->encoding != REDIS_ENCODING_INT) {
redisPanic("Unexpected encoding");
}
if (argv[j]->encoding == REDIS_ENCODING_RAW) {
objlen = sdslen(argv[j]->ptr);
objptr = argv[j]->ptr;
} else {
objlen = ll2string(llstr,REDIS_LONGSTR_SIZE,(long)argv[j]->ptr);
objptr = llstr;
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
addReply(slave,selectcmd);
}
/* We need enough space for bulk reply encoding, newlines, and
* the data itself. */
if (buf_left < objlen+REDIS_LONGSTR_SIZE+32) break;
/* Write $...CRLF */
b[0] = '$';
len = ll2string(b+1,REDIS_LONGSTR_SIZE,objlen);
b += len+1;
buf_left -= len+1;
b[0] = '\r';
b[1] = '\n';
b += 2;
buf_left -= 2;
/* And data plus CRLF */
memcpy(b,objptr,objlen);
b += objlen;
buf_left -= objlen;
b[0] = '\r';
b[1] = '\n';
b += 2;
buf_left -= 2;
}
/* Create an object with the static buffer content. */
redisAssert(buf_left < FEEDSLAVE_BUF_SIZE);
o = createStringObject(buf,b-buf);
/* If we have a backlog, populate it with data and increment
* the global replication offset. */
if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
feedReplicationBacklogWithObject(o);
for (i = j; i < argc; i++) {
char aux[REDIS_LONGSTR_SIZE+3];
long objlen = stringObjectLen(argv[i]);
char aux[REDIS_LONGSTR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux-1),argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* ad add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,objlen,sizeof(aux)-1);
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[i]);
feedReplicationBacklogWithObject(shared.crlf);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklogWithObject(aux+len+1,2);
}
}
/* Write data to slaves. Here we do two things:
* 1) We write the "o" object that was created using the accumulated
* static buffer.
* 2) We write any additional argument of the command to replicate that
* was not written inside the static buffer for lack of space.
*/
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
redisClient *slave = ln->value;
......@@ -276,15 +222,14 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* First, trasmit the object created from the static buffer. */
addReply(slave,o);
/* Add the multi bulk length. */
addReplyMultiBulkLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (i = j; i < argc; i++)
addReplyBulk(slave,argv[i]);
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
decrRefCount(o);
}
void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册