diff --git a/src/aof.c b/src/aof.c index ef72a2b169a007387179e5d24d4830fcbb50e047..aadf644811bdf76c924ed9518413b11c14b4d1fe 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1,5 +1,4 @@ #include "redis.h" - #include #include #include @@ -7,6 +6,7 @@ #include #include #include +#include "rio.h" /* Called when the user switches from "appendonly yes" to "appendonly no" * at runtime using the CONFIG command. */ @@ -313,11 +313,26 @@ fmterr: exit(1); } +/* Delegate writing an object to writing a bulk string or bulk long long. + * This is not placed in rio.c since that adds the redis.h dependency. */ +int rioWriteBulkObject(rio *r, robj *obj) { + /* Avoid using getDecodedObject to help copy-on-write (we are often + * in a child process when this function is called). */ + if (obj->encoding == REDIS_ENCODING_INT) { + return rioWriteBulkLongLong(r,(long)obj->ptr); + } else if (obj->encoding == REDIS_ENCODING_RAW) { + return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr)); + } else { + redisPanic("Unknown string encoding"); + } +} + /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; + rio aof; FILE *fp; char tmpfile[256]; int j; @@ -331,6 +346,8 @@ int rewriteAppendOnlyFile(char *filename) { redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno)); return REDIS_ERR; } + + aof = rioInitWithFile(fp); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; @@ -343,8 +360,8 @@ int rewriteAppendOnlyFile(char *filename) { } /* SELECT the new DB */ - if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkLongLong(fp,j) == 0) goto werr; + if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; + if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { @@ -362,10 +379,10 @@ int rewriteAppendOnlyFile(char *filename) { if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,o) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { /* Emit the RPUSHes needed to rebuild the list */ char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; @@ -377,13 +394,13 @@ int rewriteAppendOnlyFile(char *filename) { long long vlong; while(ziplistGet(p,&vstr,&vlen,&vlong)) { - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (vstr) { - if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0) goto werr; } else { - if (fwriteBulkLongLong(fp,vlong) == 0) + if (rioWriteBulkLongLong(&aof,vlong) == 0) goto werr; } p = ziplistNext(zl,p); @@ -397,9 +414,9 @@ int rewriteAppendOnlyFile(char *filename) { while((ln = listNext(&li))) { robj *eleobj = listNodeValue(ln); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; } } else { redisPanic("Unknown list encoding"); @@ -412,18 +429,18 @@ int rewriteAppendOnlyFile(char *filename) { int ii = 0; int64_t llval; while(intsetGet(o->ptr,ii++,&llval)) { - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkLongLong(fp,llval) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkLongLong(&aof,llval) == 0) goto werr; } } else if (o->encoding == REDIS_ENCODING_HT) { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetEntryKey(de); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; } dictReleaseIterator(di); } else { @@ -450,14 +467,14 @@ int rewriteAppendOnlyFile(char *filename) { redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); score = zzlGetScore(sptr); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkDouble(fp,score) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkDouble(&aof,score) == 0) goto werr; if (vstr != NULL) { - if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0) goto werr; } else { - if (fwriteBulkLongLong(fp,vll) == 0) + if (rioWriteBulkLongLong(&aof,vll) == 0) goto werr; } zzlNext(zl,&eptr,&sptr); @@ -471,10 +488,10 @@ int rewriteAppendOnlyFile(char *filename) { robj *eleobj = dictGetEntryKey(de); double *score = dictGetEntryVal(de); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkDouble(fp,*score) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkDouble(&aof,*score) == 0) goto werr; + if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; } dictReleaseIterator(di); } else { @@ -490,11 +507,11 @@ int rewriteAppendOnlyFile(char *filename) { unsigned int flen, vlen; while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkString(fp,(char*)field,flen) == 0) + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkString(&aof,(char*)field,flen) == 0) goto werr; - if (fwriteBulkString(fp,(char*)val,vlen) == 0) + if (rioWriteBulkString(&aof,(char*)val,vlen) == 0) goto werr; } } else { @@ -505,10 +522,10 @@ int rewriteAppendOnlyFile(char *filename) { robj *field = dictGetEntryKey(de); robj *val = dictGetEntryVal(de); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,field) == 0) goto werr; - if (fwriteBulkObject(fp,val) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,field) == 0) goto werr; + if (rioWriteBulkObject(&aof,val) == 0) goto werr; } dictReleaseIterator(di); } @@ -520,9 +537,9 @@ int rewriteAppendOnlyFile(char *filename) { char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n"; /* If this key is already expired skip it */ if (expiretime < now) continue; - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } dictReleaseIterator(di); diff --git a/src/redis.h b/src/redis.h index 3fd13e2ed18cd63397113bb7794896289549de76..5af2da8e96fe8ec9748fc7a2fa1b6cbc3edf29b2 100644 --- a/src/redis.h +++ b/src/redis.h @@ -841,11 +841,6 @@ unsigned long estimateObjectIdleTime(robj *o); int syncWrite(int fd, char *ptr, ssize_t size, int timeout); int syncRead(int fd, char *ptr, ssize_t size, int timeout); int syncReadLine(int fd, char *ptr, ssize_t size, int timeout); -int fwriteBulkString(FILE *fp, char *s, unsigned long len); -int fwriteBulkDouble(FILE *fp, double d); -int fwriteBulkLongLong(FILE *fp, long long l); -int fwriteBulkObject(FILE *fp, robj *obj); -int fwriteBulkCount(FILE *fp, char prefix, int count); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); diff --git a/src/syncio.c b/src/syncio.c index 3d0e0451bfd41011bab2d0a524ef66dbceb58e16..9958363be23382c4bec7e520ce8a22d20a480271 100644 --- a/src/syncio.c +++ b/src/syncio.c @@ -99,70 +99,3 @@ int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) { } return nread; } - -/* ----------------- Blocking sockets I/O with timeouts --------------------- */ - -/* Write binary-safe string into a file in the bulkformat - * $\r\n\r\n */ -int fwriteBulkString(FILE *fp, char *s, unsigned long len) { - char cbuf[128]; - int clen; - - cbuf[0] = '$'; - clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len); - cbuf[clen++] = '\r'; - cbuf[clen++] = '\n'; - if (fwrite(cbuf,clen,1,fp) == 0) return 0; - if (len > 0 && fwrite(s,len,1,fp) == 0) return 0; - if (fwrite("\r\n",2,1,fp) == 0) return 0; - return 1; -} - -/* Write a multi bulk count in the form "*\r\n" */ -int fwriteBulkCount(FILE *fp, char prefix, int count) { - char cbuf[128]; - int clen; - - cbuf[0] = prefix; - clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count); - cbuf[clen++] = '\r'; - cbuf[clen++] = '\n'; - if (fwrite(cbuf,clen,1,fp) == 0) return 0; - return 1; -} - -/* Write a double value in bulk format $\r\n\r\n */ -int fwriteBulkDouble(FILE *fp, double d) { - char buf[128], dbuf[128]; - - snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d); - snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0; - return 1; -} - -/* Write a long value in bulk format $\r\n\r\n */ -int fwriteBulkLongLong(FILE *fp, long long l) { - char bbuf[128], lbuf[128]; - unsigned int blen, llen; - llen = ll2string(lbuf,32,l); - blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf); - if (fwrite(bbuf,blen,1,fp) == 0) return 0; - return 1; -} - -/* Delegate writing an object to writing a bulk string or bulk long long. */ -int fwriteBulkObject(FILE *fp, robj *obj) { - /* Avoid using getDecodedObject to help copy-on-write (we are often - * in a child process when this function is called). */ - if (obj->encoding == REDIS_ENCODING_INT) { - return fwriteBulkLongLong(fp,(long)obj->ptr); - } else if (obj->encoding == REDIS_ENCODING_RAW) { - return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr)); - } else { - redisPanic("Unknown string encoding"); - } -} - -