未验证 提交 967a98f5 编写于 作者: S Salvatore Sanfilippo 提交者: GitHub

Merge pull request #4811 from oranagra/cli-diskless-repl

Add redis-cli support for diskless replication (CAPA EOF)
......@@ -6152,9 +6152,31 @@ static void latencyDistMode(void) {
* Slave mode
*--------------------------------------------------------------------------- */
#define RDB_EOF_MARK_SIZE 40
void sendReplconf(const char* arg1, const char* arg2) {
printf("sending REPLCONF %s %s\n", arg1, arg2);
redisReply *reply = redisCommand(context, "REPLCONF %s %s", arg1, arg2);
/* Handle any error conditions */
if(reply == NULL) {
fprintf(stderr, "\nI/O error\n");
exit(1);
} else if(reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "REPLCONF %s error: %s\n", arg1, reply->str);
/* non fatal, old versions may not support it */
}
freeReplyObject(reply);
}
void sendCapa() {
sendReplconf("capa", "eof");
}
/* Sends SYNC and reads the number of bytes in the payload. Used both by
* slaveMode() and getRDB(). */
unsigned long long sendSync(int fd) {
* slaveMode() and getRDB().
* returns 0 in case an EOF marker is used. */
unsigned long long sendSync(int fd, char *out_eof) {
/* To start we need to send the SYNC command and return the payload.
* The hiredis client lib does not understand this part of the protocol
* and we don't want to mess with its buffers, so everything is performed
......@@ -6184,17 +6206,33 @@ unsigned long long sendSync(int fd) {
printf("SYNC with master failed: %s\n", buf);
exit(1);
}
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) {
memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE);
return 0;
}
return strtoull(buf+1,NULL,10);
}
static void slaveMode(void) {
int fd = context->fd;
unsigned long long payload = sendSync(fd);
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(fd, eofmark);
char buf[1024];
int original_output = config.output;
fprintf(stderr,"SYNC with master, discarding %llu "
"bytes of bulk transfer...\n", payload);
if (payload == 0) {
payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC with master, discarding "
"bytes of bulk transfer until EOF marker...\n");
} else {
fprintf(stderr,"SYNC with master, discarding %llu "
"bytes of bulk transfer...\n", payload);
}
/* Discard the payload. */
while(payload) {
......@@ -6206,8 +6244,29 @@ static void slaveMode(void) {
exit(1);
}
payload -= nread;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= RDB_EOF_MARK_SIZE) {
memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
} else {
int rem = RDB_EOF_MARK_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
break;
}
}
fprintf(stderr,"SYNC done. Logging commands from master.\n");
if (usemark) {
unsigned long long offset = ULLONG_MAX - payload;
fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset);
/* put the slave online */
sleep(1);
sendReplconf("ACK", "0");
} else
fprintf(stderr,"SYNC done. Logging commands from master.\n");
/* Now we can use hiredis to read the incoming protocol. */
config.output = OUTPUT_CSV;
......@@ -6224,11 +6283,22 @@ static void slaveMode(void) {
static void getRDB(void) {
int s = context->fd;
int fd;
unsigned long long payload = sendSync(s);
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(s, eofmark);
char buf[4096];
fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
payload, config.rdb_filename);
if (payload == 0) {
payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC sent to master, writing bytes of bulk transfer until EOF marker to '%s'\n",
config.rdb_filename);
} else {
fprintf(stderr,"SYNC sent to master, writing %llu bytes to '%s'\n",
payload, config.rdb_filename);
}
/* Write to file. */
if (!strcmp(config.rdb_filename,"-")) {
......@@ -6257,11 +6327,31 @@ static void getRDB(void) {
exit(1);
}
payload -= nread;
if (usemark) {
/* Update the last bytes array, and check if it matches our delimiter.*/
if (nread >= RDB_EOF_MARK_SIZE) {
memcpy(lastbytes,buf+nread-RDB_EOF_MARK_SIZE,RDB_EOF_MARK_SIZE);
} else {
int rem = RDB_EOF_MARK_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,RDB_EOF_MARK_SIZE) == 0)
break;
}
}
if (usemark) {
payload = ULLONG_MAX - payload - RDB_EOF_MARK_SIZE;
if (ftruncate(fd, payload) == -1)
fprintf(stderr,"ftruncate failed: %s.\n", strerror(errno));
fprintf(stderr,"Transfer finished with success after %llu bytes\n", payload);
} else {
fprintf(stderr,"Transfer finished with success.\n");
}
close(s); /* Close the file descriptor ASAP as fsync() may take time. */
fsync(fd);
close(fd);
fprintf(stderr,"Transfer finished with success.\n");
exit(0);
}
......@@ -7314,12 +7404,14 @@ int main(int argc, char **argv) {
/* Slave mode */
if (config.slave_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
slaveMode();
}
/* Get RDB mode. */
if (config.getrdb_mode) {
if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
getRDB();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册