提交 7271198c 编写于 作者: P Pieter Noordhuis

Use rio.h functions in aof.c

上级 fd535c58
#include "redis.h"
#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
......@@ -7,6 +6,7 @@
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include "rio.h"
/* Called when the user switches from "appendonly yes" to "appendonly no"
* at runtime using the CONFIG command. */
......@@ -313,11 +313,26 @@ fmterr:
exit(1);
}
/* Delegate writing an object to writing a bulk string or bulk long long.
* This is not placed in rio.c since that adds the redis.h dependency. */
int rioWriteBulkObject(rio *r, robj *obj) {
/* Avoid using getDecodedObject to help copy-on-write (we are often
* in a child process when this function is called). */
if (obj->encoding == REDIS_ENCODING_INT) {
return rioWriteBulkLongLong(r,(long)obj->ptr);
} else if (obj->encoding == REDIS_ENCODING_RAW) {
return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
} else {
redisPanic("Unknown string encoding");
}
}
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
int rewriteAppendOnlyFile(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
rio aof;
FILE *fp;
char tmpfile[256];
int j;
......@@ -331,6 +346,8 @@ int rewriteAppendOnlyFile(char *filename) {
redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
return REDIS_ERR;
}
aof = rioInitWithFile(fp);
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
......@@ -343,8 +360,8 @@ int rewriteAppendOnlyFile(char *filename) {
}
/* SELECT the new DB */
if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkLongLong(fp,j) == 0) goto werr;
if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
......@@ -362,10 +379,10 @@ int rewriteAppendOnlyFile(char *filename) {
if (o->type == REDIS_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkObject(fp,o) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,o) == 0) goto werr;
} else if (o->type == REDIS_LIST) {
/* Emit the RPUSHes needed to rebuild the list */
char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
......@@ -377,13 +394,13 @@ int rewriteAppendOnlyFile(char *filename) {
long long vlong;
while(ziplistGet(p,&vstr,&vlen,&vlong)) {
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (vstr) {
if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0)
goto werr;
} else {
if (fwriteBulkLongLong(fp,vlong) == 0)
if (rioWriteBulkLongLong(&aof,vlong) == 0)
goto werr;
}
p = ziplistNext(zl,p);
......@@ -397,9 +414,9 @@ int rewriteAppendOnlyFile(char *filename) {
while((ln = listNext(&li))) {
robj *eleobj = listNodeValue(ln);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
}
} else {
redisPanic("Unknown list encoding");
......@@ -412,18 +429,18 @@ int rewriteAppendOnlyFile(char *filename) {
int ii = 0;
int64_t llval;
while(intsetGet(o->ptr,ii++,&llval)) {
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,llval) == 0) goto werr;
}
} else if (o->encoding == REDIS_ENCODING_HT) {
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
robj *eleobj = dictGetEntryKey(de);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
}
dictReleaseIterator(di);
} else {
......@@ -450,14 +467,14 @@ int rewriteAppendOnlyFile(char *filename) {
redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
score = zzlGetScore(sptr);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkDouble(fp,score) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkDouble(&aof,score) == 0) goto werr;
if (vstr != NULL) {
if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0)
goto werr;
} else {
if (fwriteBulkLongLong(fp,vll) == 0)
if (rioWriteBulkLongLong(&aof,vll) == 0)
goto werr;
}
zzlNext(zl,&eptr,&sptr);
......@@ -471,10 +488,10 @@ int rewriteAppendOnlyFile(char *filename) {
robj *eleobj = dictGetEntryKey(de);
double *score = dictGetEntryVal(de);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkDouble(fp,*score) == 0) goto werr;
if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkDouble(&aof,*score) == 0) goto werr;
if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
}
dictReleaseIterator(di);
} else {
......@@ -490,11 +507,11 @@ int rewriteAppendOnlyFile(char *filename) {
unsigned int flen, vlen;
while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkString(fp,(char*)field,flen) == 0)
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkString(&aof,(char*)field,flen) == 0)
goto werr;
if (fwriteBulkString(fp,(char*)val,vlen) == 0)
if (rioWriteBulkString(&aof,(char*)val,vlen) == 0)
goto werr;
}
} else {
......@@ -505,10 +522,10 @@ int rewriteAppendOnlyFile(char *filename) {
robj *field = dictGetEntryKey(de);
robj *val = dictGetEntryVal(de);
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkObject(fp,field) == 0) goto werr;
if (fwriteBulkObject(fp,val) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkObject(&aof,field) == 0) goto werr;
if (rioWriteBulkObject(&aof,val) == 0) goto werr;
}
dictReleaseIterator(di);
}
......@@ -520,9 +537,9 @@ int rewriteAppendOnlyFile(char *filename) {
char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
/* If this key is already expired skip it */
if (expiretime < now) continue;
if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
}
}
dictReleaseIterator(di);
......
......@@ -841,11 +841,6 @@ unsigned long estimateObjectIdleTime(robj *o);
int syncWrite(int fd, char *ptr, ssize_t size, int timeout);
int syncRead(int fd, char *ptr, ssize_t size, int timeout);
int syncReadLine(int fd, char *ptr, ssize_t size, int timeout);
int fwriteBulkString(FILE *fp, char *s, unsigned long len);
int fwriteBulkDouble(FILE *fp, double d);
int fwriteBulkLongLong(FILE *fp, long long l);
int fwriteBulkObject(FILE *fp, robj *obj);
int fwriteBulkCount(FILE *fp, char prefix, int count);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
......
......@@ -99,70 +99,3 @@ int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
}
return nread;
}
/* ----------------- Blocking sockets I/O with timeouts --------------------- */
/* Write binary-safe string into a file in the bulkformat
* $<count>\r\n<payload>\r\n */
int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
char cbuf[128];
int clen;
cbuf[0] = '$';
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
cbuf[clen++] = '\r';
cbuf[clen++] = '\n';
if (fwrite(cbuf,clen,1,fp) == 0) return 0;
if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
if (fwrite("\r\n",2,1,fp) == 0) return 0;
return 1;
}
/* Write a multi bulk count in the form "*<count>\r\n" */
int fwriteBulkCount(FILE *fp, char prefix, int count) {
char cbuf[128];
int clen;
cbuf[0] = prefix;
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
cbuf[clen++] = '\r';
cbuf[clen++] = '\n';
if (fwrite(cbuf,clen,1,fp) == 0) return 0;
return 1;
}
/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
int fwriteBulkDouble(FILE *fp, double d) {
char buf[128], dbuf[128];
snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
return 1;
}
/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
int fwriteBulkLongLong(FILE *fp, long long l) {
char bbuf[128], lbuf[128];
unsigned int blen, llen;
llen = ll2string(lbuf,32,l);
blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
if (fwrite(bbuf,blen,1,fp) == 0) return 0;
return 1;
}
/* Delegate writing an object to writing a bulk string or bulk long long. */
int fwriteBulkObject(FILE *fp, robj *obj) {
/* Avoid using getDecodedObject to help copy-on-write (we are often
* in a child process when this function is called). */
if (obj->encoding == REDIS_ENCODING_INT) {
return fwriteBulkLongLong(fp,(long)obj->ptr);
} else if (obj->encoding == REDIS_ENCODING_RAW) {
return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
} else {
redisPanic("Unknown string encoding");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册