diff --git a/src/aof.c b/src/aof.c index 25febb91fd3743ed744924b3556550d4f8c0df83..167134818ac3292c8095a205378fa9e15bf110da 100644 --- a/src/aof.c +++ b/src/aof.c @@ -189,6 +189,7 @@ struct redisClient *createFakeClient(void) { c->querybuf = sdsempty(); c->argc = 0; c->argv = NULL; + c->bufpos = 0; c->flags = 0; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ @@ -272,12 +273,14 @@ int loadAppendOnlyFile(char *filename) { fakeClient->argc = argc; fakeClient->argv = argv; cmd->proc(fakeClient); - /* Discard the reply objects list from the fake client */ - while(listLength(fakeClient->reply)) - listDelNode(fakeClient->reply,listFirst(fakeClient->reply)); + + /* The fake client should not have a reply */ + redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); + /* Clean up, ready for the next command */ for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); + /* Handle swapping while loading big datasets when VM is on */ force_swapout = 0; if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) @@ -629,12 +632,11 @@ int rewriteAppendOnlyFileBackground(void) { void bgrewriteaofCommand(redisClient *c) { if (server.bgrewritechildpid != -1) { - addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n")); + addReplyError(c,"Background append only file rewriting already in progress"); return; } if (rewriteAppendOnlyFileBackground() == REDIS_OK) { - char *status = "+Background append only file rewriting started\r\n"; - addReplySds(c,sdsnew(status)); + addReplyStatus(c,"Background append only file rewriting started"); } else { addReply(c,shared.err); } diff --git a/src/config.c b/src/config.c index e1b743dbf41ace2c6b41a9b10ef1cd65aac0455b..8a5ad6c2bc8cd6d0671b2194dad057d23ae7ac7f 100644 --- a/src/config.c +++ b/src/config.c @@ -270,8 +270,8 @@ void configSetCommand(redisClient *c) { stopAppendOnly(); } else { if (startAppendOnly() == REDIS_ERR) { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR Unable to turn on AOF. Check server logs.\r\n")); + addReplyError(c, + "Unable to turn on AOF. Check server logs."); decrRefCount(o); return; } @@ -312,9 +312,8 @@ void configSetCommand(redisClient *c) { } sdsfreesplitres(v,vlen); } else { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR not supported CONFIG parameter %s\r\n", - (char*)c->argv[2]->ptr)); + addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s", + (char*)c->argv[2]->ptr); decrRefCount(o); return; } @@ -323,22 +322,18 @@ void configSetCommand(redisClient *c) { return; badfmt: /* Bad format errors */ - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n", + addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'", (char*)o->ptr, - (char*)c->argv[2]->ptr)); + (char*)c->argv[2]->ptr); decrRefCount(o); } void configGetCommand(redisClient *c) { robj *o = getDecodedObject(c->argv[2]); - robj *lenobj = createObject(REDIS_STRING,NULL); + void *replylen = addDeferredMultiBulkLength(c); char *pattern = o->ptr; int matches = 0; - addReply(c,lenobj); - decrRefCount(lenobj); - if (stringmatch(pattern,"dbfilename",0)) { addReplyBulkCString(c,"dbfilename"); addReplyBulkCString(c,server.dbfilename); @@ -410,7 +405,7 @@ void configGetCommand(redisClient *c) { matches++; } decrRefCount(o); - lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2); + setDeferredMultiBulkLength(c,replylen,matches*2); } void configCommand(redisClient *c) { @@ -428,13 +423,12 @@ void configCommand(redisClient *c) { server.stat_starttime = time(NULL); addReply(c,shared.ok); } else { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n")); + addReplyError(c, + "CONFIG subcommand must be one of GET, SET, RESETSTAT"); } return; badarity: - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR Wrong number of arguments for CONFIG %s\r\n", - (char*) c->argv[1]->ptr)); + addReplyErrorFormat(c,"Wrong number of arguments for CONFIG %s", + (char*) c->argv[1]->ptr); } diff --git a/src/db.c b/src/db.c index afca3cddd12eb2b9a3f5d3df6aa684c389b16d97..470310a304c279536efbf6ddc313a9a3eea05714 100644 --- a/src/db.c +++ b/src/db.c @@ -204,7 +204,7 @@ void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); if (selectDb(c,id) == REDIS_ERR) { - addReplySds(c,sdsnew("-ERR invalid DB index\r\n")); + addReplyError(c,"invalid DB index"); } else { addReply(c,shared.ok); } @@ -228,11 +228,9 @@ void keysCommand(redisClient *c) { sds pattern = c->argv[1]->ptr; int plen = sdslen(pattern), allkeys; unsigned long numkeys = 0; - robj *lenobj = createObject(REDIS_STRING,NULL); + void *replylen = addDeferredMultiBulkLength(c); di = dictGetIterator(c->db->dict); - addReply(c,lenobj); - decrRefCount(lenobj); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); while((de = dictNext(di)) != NULL) { sds key = dictGetEntryKey(de); @@ -248,17 +246,15 @@ void keysCommand(redisClient *c) { } } dictReleaseIterator(di); - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",numkeys); + setDeferredMultiBulkLength(c,replylen,numkeys); } void dbsizeCommand(redisClient *c) { - addReplySds(c, - sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict))); + addReplyLongLong(c,dictSize(c->db->dict)); } void lastsaveCommand(redisClient *c) { - addReplySds(c, - sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave)); + addReplyLongLong(c,server.lastsave); } void typeCommand(redisClient *c) { @@ -267,24 +263,23 @@ void typeCommand(redisClient *c) { o = lookupKeyRead(c->db,c->argv[1]); if (o == NULL) { - type = "+none"; + type = "none"; } else { switch(o->type) { - case REDIS_STRING: type = "+string"; break; - case REDIS_LIST: type = "+list"; break; - case REDIS_SET: type = "+set"; break; - case REDIS_ZSET: type = "+zset"; break; - case REDIS_HASH: type = "+hash"; break; - default: type = "+unknown"; break; + case REDIS_STRING: type = "string"; break; + case REDIS_LIST: type = "list"; break; + case REDIS_SET: type = "set"; break; + case REDIS_ZSET: type = "zset"; break; + case REDIS_HASH: type = "hash"; break; + default: type = "unknown"; break; } } - addReplySds(c,sdsnew(type)); - addReply(c,shared.crlf); + addReplyStatus(c,type); } void saveCommand(redisClient *c) { if (server.bgsavechildpid != -1) { - addReplySds(c,sdsnew("-ERR background save in progress\r\n")); + addReplyError(c,"Background save already in progress"); return; } if (rdbSave(server.dbfilename) == REDIS_OK) { @@ -296,12 +291,11 @@ void saveCommand(redisClient *c) { void bgsaveCommand(redisClient *c) { if (server.bgsavechildpid != -1) { - addReplySds(c,sdsnew("-ERR background save already in progress\r\n")); + addReplyError(c,"Background save already in progress"); return; } if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { - char *status = "+Background saving started\r\n"; - addReplySds(c,sdsnew(status)); + addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err); } @@ -310,7 +304,7 @@ void bgsaveCommand(redisClient *c) { void shutdownCommand(redisClient *c) { if (prepareForShutdown() == REDIS_OK) exit(0); - addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n")); + addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); } void renameGenericCommand(redisClient *c, int nx) { diff --git a/src/debug.c b/src/debug.c index 76d18b214964170a6162636f26cd591063f4ff04..2f7ab58f1f4b5926186627adc278ced985d4d327 100644 --- a/src/debug.c +++ b/src/debug.c @@ -211,18 +211,18 @@ void debugCommand(redisClient *c) { char *strenc; strenc = strEncoding(val->encoding); - addReplySds(c,sdscatprintf(sdsempty(), - "+Value at:%p refcount:%d " - "encoding:%s serializedlength:%lld\r\n", + addReplyStatusFormat(c, + "Value at:%p refcount:%d " + "encoding:%s serializedlength:%lld", (void*)val, val->refcount, - strenc, (long long) rdbSavedObjectLen(val,NULL))); + strenc, (long long) rdbSavedObjectLen(val,NULL)); } else { vmpointer *vp = (vmpointer*) val; - addReplySds(c,sdscatprintf(sdsempty(), - "+Value swapped at: page %llu " - "using %llu pages\r\n", + addReplyStatusFormat(c, + "Value swapped at: page %llu " + "using %llu pages", (unsigned long long) vp->page, - (unsigned long long) vp->usedpages)); + (unsigned long long) vp->usedpages); } } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) { lookupKeyRead(c->db,c->argv[2]); @@ -233,7 +233,7 @@ void debugCommand(redisClient *c) { vmpointer *vp; if (!server.vm_enabled) { - addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n")); + addReplyError(c,"Virtual Memory is disabled"); return; } if (!de) { @@ -243,9 +243,9 @@ void debugCommand(redisClient *c) { val = dictGetEntryVal(de); /* Swap it */ if (val->storage != REDIS_VM_MEMORY) { - addReplySds(c,sdsnew("-ERR This key is not in memory\r\n")); + addReplyError(c,"This key is not in memory"); } else if (val->refcount != 1) { - addReplySds(c,sdsnew("-ERR Object is shared\r\n")); + addReplyError(c,"Object is shared"); } else if ((vp = vmSwapObjectBlocking(val)) != NULL) { dictGetEntryVal(de) = vp; addReply(c,shared.ok); @@ -274,18 +274,17 @@ void debugCommand(redisClient *c) { addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { unsigned char digest[20]; - sds d = sdsnew("+"); + sds d = sdsempty(); int j; computeDatasetDigest(digest); for (j = 0; j < 20; j++) d = sdscatprintf(d, "%02x",digest[j]); - - d = sdscatlen(d,"\r\n",2); - addReplySds(c,d); + addReplyStatus(c,d); + sdsfree(d); } else { - addReplySds(c,sdsnew( - "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPIN |SWAPOUT |RELOAD]\r\n")); + addReplyError(c, + "Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPIN |SWAPOUT |RELOAD]"); } } diff --git a/src/multi.c b/src/multi.c index def1dd67326b546d4e96a7c88ee7ebf53fc06a51..47615eb04ff18c7872964600365580d6dec59ea1 100644 --- a/src/multi.c +++ b/src/multi.c @@ -42,7 +42,7 @@ void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { void multiCommand(redisClient *c) { if (c->flags & REDIS_MULTI) { - addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n")); + addReplyError(c,"MULTI calls can not be nested"); return; } c->flags |= REDIS_MULTI; @@ -51,7 +51,7 @@ void multiCommand(redisClient *c) { void discardCommand(redisClient *c) { if (!(c->flags & REDIS_MULTI)) { - addReplySds(c,sdsnew("-ERR DISCARD without MULTI\r\n")); + addReplyError(c,"DISCARD without MULTI"); return; } @@ -82,7 +82,7 @@ void execCommand(redisClient *c) { int orig_argc; if (!(c->flags & REDIS_MULTI)) { - addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n")); + addReplyError(c,"EXEC without MULTI"); return; } @@ -107,7 +107,7 @@ void execCommand(redisClient *c) { unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count)); + addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; @@ -251,7 +251,7 @@ void watchCommand(redisClient *c) { int j; if (c->flags & REDIS_MULTI) { - addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n")); + addReplyError(c,"WATCH inside MULTI is not allowed"); return; } for (j = 1; j < c->argc; j++) diff --git a/src/networking.c b/src/networking.c index 104444f090545a1f56166a9dd8c852be8bd51b19..96ce5a99712ec3f21a655cd49cb77af2650b5386 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1,5 +1,4 @@ #include "redis.h" - #include void *dupClientReplyValue(void *o) { @@ -12,7 +11,12 @@ int listMatchObjects(void *a, void *b) { } redisClient *createClient(int fd) { - redisClient *c = zmalloc(sizeof(*c)); + redisClient *c; + + /* Allocate more space to hold a static write buffer. */ + c = zmalloc(sizeof(redisClient)+REDIS_REPLY_CHUNK_BYTES); + c->buflen = REDIS_REPLY_CHUNK_BYTES; + c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); @@ -56,70 +60,238 @@ redisClient *createClient(int fd) { return c; } -void addReply(redisClient *c, robj *obj) { - if (listLength(c->reply) == 0 && +int _ensureFileEvent(redisClient *c) { + if (c->fd <= 0) return REDIS_ERR; + if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, - sendReplyToClient, c) == AE_ERR) return; + sendReplyToClient, c) == AE_ERR) return REDIS_ERR; + return REDIS_OK; +} +/* Create a duplicate of the last object in the reply list when + * it is not exclusively owned by the reply list. */ +robj *dupLastObjectIfNeeded(list *reply) { + robj *new, *cur; + listNode *ln; + redisAssert(listLength(reply) > 0); + ln = listLast(reply); + cur = listNodeValue(ln); + if (cur->refcount > 1) { + new = dupStringObject(cur); + decrRefCount(cur); + listNodeValue(ln) = new; + } + return listNodeValue(ln); +} + +int _addReplyToBuffer(redisClient *c, char *s, size_t len) { + size_t available = c->buflen-c->bufpos; + + /* If there already are entries in the reply list, we cannot + * add anything more to the static buffer. */ + if (listLength(c->reply) > 0) return REDIS_ERR; + + /* Check that the buffer has enough space available for this string. */ + if (len > available) return REDIS_ERR; + + memcpy(c->buf+c->bufpos,s,len); + c->bufpos+=len; + return REDIS_OK; +} + +void _addReplyObjectToList(redisClient *c, robj *o) { + robj *tail; + if (listLength(c->reply) == 0) { + incrRefCount(o); + listAddNodeTail(c->reply,o); + } else { + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); + } else { + incrRefCount(o); + listAddNodeTail(c->reply,o); + } + } +} + +/* This method takes responsibility over the sds. When it is no longer + * needed it will be free'd, otherwise it ends up in a robj. */ +void _addReplySdsToList(redisClient *c, sds s) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } else { + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); + sdsfree(s); + } else { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } + } +} + +void _addReplyStringToList(redisClient *c, char *s, size_t len) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createStringObject(s,len)); + } else { + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,len); + } else { + listAddNodeTail(c->reply,createStringObject(s,len)); + } + } +} + +void addReply(redisClient *c, robj *obj) { + if (_ensureFileEvent(c) != REDIS_OK) return; if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { + /* Returns a new object with refcount 1 */ obj = dupStringObject(obj); - obj->refcount = 0; /* getDecodedObject() will increment the refcount */ + } else { + /* This increments the refcount. */ + obj = getDecodedObject(obj); } - listAddNodeTail(c->reply,getDecodedObject(obj)); + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) + _addReplyObjectToList(c,obj); + decrRefCount(obj); } void addReplySds(redisClient *c, sds s) { - robj *o = createObject(REDIS_STRING,s); - addReply(c,o); - decrRefCount(o); + if (_ensureFileEvent(c) != REDIS_OK) { + /* The caller expects the sds to be free'd. */ + sdsfree(s); + return; + } + if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { + sdsfree(s); + } else { + /* This method free's the sds when it is no longer needed. */ + _addReplySdsToList(c,s); + } } -void addReplyDouble(redisClient *c, double d) { - char buf[128]; +void addReplyString(redisClient *c, char *s, size_t len) { + if (_ensureFileEvent(c) != REDIS_OK) return; + if (_addReplyToBuffer(c,s,len) != REDIS_OK) + _addReplyStringToList(c,s,len); +} - snprintf(buf,sizeof(buf),"%.17g",d); - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n", - (unsigned long) strlen(buf),buf)); +void _addReplyError(redisClient *c, char *s, size_t len) { + addReplyString(c,"-ERR ",5); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); } -void addReplyLongLong(redisClient *c, long long ll) { - char buf[128]; - size_t len; +void addReplyError(redisClient *c, char *err) { + _addReplyError(c,err,strlen(err)); +} - if (ll == 0) { - addReply(c,shared.czero); - return; - } else if (ll == 1) { - addReply(c,shared.cone); - return; +void addReplyErrorFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyError(c,s,sdslen(s)); + sdsfree(s); +} + +void _addReplyStatus(redisClient *c, char *s, size_t len) { + addReplyString(c,"+",1); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); +} + +void addReplyStatus(redisClient *c, char *status) { + _addReplyStatus(c,status,strlen(status)); +} + +void addReplyStatusFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyStatus(c,s,sdslen(s)); + sdsfree(s); +} + +/* Adds an empty object to the reply list that will contain the multi bulk + * length, which is not known when this function is called. */ +void *addDeferredMultiBulkLength(redisClient *c) { + if (_ensureFileEvent(c) != REDIS_OK) return NULL; + listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL)); + return listLast(c->reply); +} + +/* Populate the length object and try glueing it to the next chunk. */ +void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { + listNode *ln = (listNode*)node; + robj *len, *next; + + /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ + if (node == NULL) return; + + len = listNodeValue(ln); + len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); + if (ln->next != NULL) { + next = listNodeValue(ln->next); + + /* Only glue when the next node is non-NULL (an sds in this case) */ + if (next->ptr != NULL) { + len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); + listDelNode(c->reply,ln->next); + } } - buf[0] = ':'; +} + +void addReplyDouble(redisClient *c, double d) { + char dbuf[128], sbuf[128]; + int dlen, slen; + dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); + slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); + addReplyString(c,sbuf,slen); +} + +void _addReplyLongLong(redisClient *c, long long ll, char prefix) { + char buf[128]; + int len; + buf[0] = prefix; len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; - addReplySds(c,sdsnewlen(buf,len+3)); + addReplyString(c,buf,len+3); } -void addReplyUlong(redisClient *c, unsigned long ul) { - char buf[128]; - size_t len; +void addReplyLongLong(redisClient *c, long long ll) { + _addReplyLongLong(c,ll,':'); +} - if (ul == 0) { - addReply(c,shared.czero); - return; - } else if (ul == 1) { - addReply(c,shared.cone); - return; - } - len = snprintf(buf,sizeof(buf),":%lu\r\n",ul); - addReplySds(c,sdsnewlen(buf,len)); +void addReplyMultiBulkLen(redisClient *c, long length) { + _addReplyLongLong(c,length,'*'); } void addReplyBulkLen(redisClient *c, robj *obj) { - size_t len, intlen; - char buf[128]; + size_t len; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); @@ -136,11 +308,7 @@ void addReplyBulkLen(redisClient *c, robj *obj) { len++; } } - buf[0] = '$'; - intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); - buf[intlen+1] = '\r'; - buf[intlen+2] = '\n'; - addReplySds(c,sdsnewlen(buf,intlen+3)); + _addReplyLongLong(c,len,'$'); } void addReplyBulk(redisClient *c, robj *obj) { @@ -290,34 +458,6 @@ void freeClient(redisClient *c) { zfree(c); } -#define GLUEREPLY_UP_TO (1024) -static void glueReplyBuffersIfNeeded(redisClient *c) { - int copylen = 0; - char buf[GLUEREPLY_UP_TO]; - listNode *ln; - listIter li; - robj *o; - - listRewind(c->reply,&li); - while((ln = listNext(&li))) { - int objlen; - - o = ln->value; - objlen = sdslen(o->ptr); - if (copylen + objlen <= GLUEREPLY_UP_TO) { - memcpy(buf+copylen,o->ptr,objlen); - copylen += objlen; - listDelNode(c->reply,ln); - } else { - if (copylen == 0) return; - break; - } - } - /* Now the output buffer is empty, add the new single element */ - o = createObject(REDIS_STRING,sdsnewlen(buf,copylen)); - listAddNodeHead(c->reply,o); -} - void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; @@ -334,31 +474,48 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } - while(listLength(c->reply)) { - if (server.glueoutputbuf && listLength(c->reply) > 1) - glueReplyBuffersIfNeeded(c); + while(c->bufpos > 0 || listLength(c->reply)) { + if (c->bufpos > 0) { + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = c->bufpos - c->sentlen; + } else { + nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; + + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if (c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + } else { + o = listNodeValue(listFirst(c->reply)); + objlen = sdslen(o->ptr); - o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o->ptr); + if (objlen == 0) { + listDelNode(c->reply,listFirst(c->reply)); + continue; + } - if (objlen == 0) { - listDelNode(c->reply,listFirst(c->reply)); - continue; - } + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = objlen - c->sentlen; + } else { + nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; - if (c->flags & REDIS_MASTER) { - /* Don't reply to a master */ - nwritten = objlen - c->sentlen; - } else { - nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); - if (nwritten <= 0) break; - } - c->sentlen += nwritten; - totwritten += nwritten; - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == objlen) { + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + } } /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve diff --git a/src/object.c b/src/object.c index 92af1d6a9e7ac6f2f5919d0c0ff3f51a856f9e55..c1a0824515bfaf12394520bb493efc7363f4396e 100644 --- a/src/object.c +++ b/src/object.c @@ -354,9 +354,9 @@ int getDoubleFromObjectOrReply(redisClient *c, robj *o, double *target, const ch double value; if (getDoubleFromObject(o, &value) != REDIS_OK) { if (msg != NULL) { - addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + addReplyError(c,(char*)msg); } else { - addReplySds(c, sdsnew("-ERR value is not a double\r\n")); + addReplyError(c,"value is not a double"); } return REDIS_ERR; } @@ -393,9 +393,9 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con long long value; if (getLongLongFromObject(o, &value) != REDIS_OK) { if (msg != NULL) { - addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + addReplyError(c,(char*)msg); } else { - addReplySds(c, sdsnew("-ERR value is not an integer or out of range\r\n")); + addReplyError(c,"value is not an integer or out of range"); } return REDIS_ERR; } @@ -410,9 +410,9 @@ int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char * if (getLongLongFromObjectOrReply(c, o, &value, msg) != REDIS_OK) return REDIS_ERR; if (value < LONG_MIN || value > LONG_MAX) { if (msg != NULL) { - addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + addReplyError(c,(char*)msg); } else { - addReplySds(c, sdsnew("-ERR value is out of range\r\n")); + addReplyError(c,"value is out of range"); } return REDIS_ERR; } diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 123d81180e76a76dca76fe298d81ff66f34dfe1a..297ecc6c5a87ab28cc8f135fcd7842de4c26f4d4 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -75,6 +75,7 @@ static struct config { long long start; long long totlatency; int *latency; + char *title; list *clients; int quiet; int loop; @@ -206,16 +207,27 @@ static void clientDone(client c) { } } +/* Read a length from the buffer pointed to by *p, store the length in *len, + * and return the number of bytes that the cursor advanced. */ +static int readLen(char *p, int *len) { + char *tail = strstr(p,"\r\n"); + if (tail == NULL) + return 0; + *tail = '\0'; + *len = atoi(p+1); + return tail+2-p; +} + static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - char buf[1024]; - int nread; + char buf[1024], *p; + int nread, pos=0, len=0; client c = privdata; REDIS_NOTUSED(el); REDIS_NOTUSED(fd); REDIS_NOTUSED(mask); - nread = read(c->fd, buf, 1024); + nread = read(c->fd,buf,sizeof(buf)); if (nread == -1) { fprintf(stderr, "Reading from socket: %s\n", strerror(errno)); freeClient(c); @@ -228,82 +240,89 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) } c->totreceived += nread; c->ibuf = sdscatlen(c->ibuf,buf,nread); + len = sdslen(c->ibuf); -processdata: - /* Are we waiting for the first line of the command of for sdf - * count in bulk or multi bulk operations? */ if (c->replytype == REPLY_INT || - c->replytype == REPLY_RETCODE || - (c->replytype == REPLY_BULK && c->readlen == -1) || - (c->replytype == REPLY_MBULK && c->readlen == -1) || - (c->replytype == REPLY_MBULK && c->mbulk == -1)) { - char *p; - - /* Check if the first line is complete. This is only true if - * there is a newline inside the buffer. */ - if ((p = strchr(c->ibuf,'\n')) != NULL) { - if (c->replytype == REPLY_BULK || - (c->replytype == REPLY_MBULK && c->mbulk != -1)) - { - /* Read the count of a bulk reply (being it a single bulk or - * a multi bulk reply). "$" for the protocol spec. */ - *p = '\0'; - *(p-1) = '\0'; - c->readlen = atoi(c->ibuf+1)+2; - // printf("BULK ATOI: %s\n", c->ibuf+1); - /* Handle null bulk reply "$-1" */ - if (c->readlen-2 == -1) { - clientDone(c); - return; - } - /* Leave all the rest in the input buffer */ - c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); - /* fall through to reach the point where the code will try - * to check if the bulk reply is complete. */ - } else if (c->replytype == REPLY_MBULK && c->mbulk == -1) { - /* Read the count of a multi bulk reply. That is, how many - * bulk replies we have to read next. "*" protocol. */ - *p = '\0'; - *(p-1) = '\0'; - c->mbulk = atoi(c->ibuf+1); - /* Handle null bulk reply "*-1" */ - if (c->mbulk == -1) { - clientDone(c); - return; + c->replytype == REPLY_RETCODE) + { + /* Check if the first line is complete. This is everything we need + * when waiting for an integer or status code reply.*/ + if ((p = strstr(c->ibuf,"\r\n")) != NULL) + goto done; + } else if (c->replytype == REPLY_BULK) { + int advance = 0; + if (c->readlen < 0) { + advance = readLen(c->ibuf+pos,&c->readlen); + if (advance) { + pos += advance; + if (c->readlen == -1) { + goto done; + } else { + /* include the trailing \r\n */ + c->readlen += 2; } - // printf("%p) %d elements list\n", c, c->mbulk); - /* Leave all the rest in the input buffer */ - c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); - goto processdata; } else { - c->ibuf = sdstrim(c->ibuf,"\r\n"); - clientDone(c); - return; + goto skip; } } - } - /* bulk read, did we read everything? */ - if (((c->replytype == REPLY_MBULK && c->mbulk != -1) || - (c->replytype == REPLY_BULK)) && c->readlen != -1 && - (unsigned)c->readlen <= sdslen(c->ibuf)) - { - // printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n", - // c->mbulk,c->readlen,sdslen(c->ibuf)); - if (c->replytype == REPLY_BULK) { - clientDone(c); - } else if (c->replytype == REPLY_MBULK) { - // printf("%p) %d (%d)) ",c, c->mbulk, c->readlen); - // fwrite(c->ibuf,c->readlen,1,stdout); - // printf("\n"); - if (--c->mbulk == 0) { - clientDone(c); + + int canconsume; + if (c->readlen > 0) { + canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen; + c->readlen -= canconsume; + pos += canconsume; + } + + if (c->readlen == 0) + goto done; + } else if (c->replytype == REPLY_MBULK) { + int advance = 0; + if (c->mbulk == -1) { + advance = readLen(c->ibuf+pos,&c->mbulk); + if (advance) { + pos += advance; + if (c->mbulk == -1) + goto done; + } else { + goto skip; + } + } + + int canconsume; + while(c->mbulk > 0 && pos < len) { + if (c->readlen > 0) { + canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen; + c->readlen -= canconsume; + pos += canconsume; + if (c->readlen == 0) + c->mbulk--; } else { - c->ibuf = sdsrange(c->ibuf,c->readlen,-1); - c->readlen = -1; - goto processdata; + advance = readLen(c->ibuf+pos,&c->readlen); + if (advance) { + pos += advance; + if (c->readlen == -1) { + c->mbulk--; + continue; + } else { + /* include the trailing \r\n */ + c->readlen += 2; + } + } else { + goto skip; + } } } + + if (c->mbulk == 0) + goto done; } + +skip: + c->ibuf = sdsrange(c->ibuf,pos,-1); + return; +done: + clientDone(c); + return; } static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) @@ -371,13 +390,13 @@ static void createMissingClients(client c) { } } -static void showLatencyReport(char *title) { +static void showLatencyReport(void) { int j, seen = 0; float perc, reqpersec; reqpersec = (float)config.donerequests/((float)config.totlatency/1000); if (!config.quiet) { - printf("====== %s ======\n", title); + printf("====== %s ======\n", config.title); printf(" %d requests completed in %.2f seconds\n", config.donerequests, (float)config.totlatency/1000); printf(" %d parallel clients\n", config.numclients); @@ -393,20 +412,20 @@ static void showLatencyReport(char *title) { } printf("%.2f requests per second\n\n", reqpersec); } else { - printf("%s: %.2f requests per second\n", title, reqpersec); + printf("%s: %.2f requests per second\n", config.title, reqpersec); } } -static void prepareForBenchmark(void) -{ +static void prepareForBenchmark(char *title) { memset(config.latency,0,sizeof(int)*(MAX_LATENCY+1)); + config.title = title; config.start = mstime(); config.donerequests = 0; } -static void endBenchmark(char *title) { +static void endBenchmark(void) { config.totlatency = mstime()-config.start; - showLatencyReport(title); + showLatencyReport(); freeAllClients(); } @@ -480,6 +499,18 @@ void parseOptions(int argc, char **argv) { } } +int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) { + REDIS_NOTUSED(eventLoop); + REDIS_NOTUSED(id); + REDIS_NOTUSED(clientData); + + float dt = (float)(mstime()-config.start)/1000.0; + float rps = (float)config.donerequests/dt; + printf("%s: %.2f\r", config.title, rps); + fflush(stdout); + return 250; /* every 250ms */ +} + int main(int argc, char **argv) { client c; @@ -491,6 +522,7 @@ int main(int argc, char **argv) { config.requests = 10000; config.liveclients = 0; config.el = aeCreateEventLoop(); + aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL); config.keepalive = 1; config.donerequests = 0; config.datasize = 3; @@ -514,7 +546,7 @@ int main(int argc, char **argv) { if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - prepareForBenchmark(); + prepareForBenchmark("IDLE"); c = createClient(); if (!c) exit(1); c->obuf = sdsempty(); @@ -525,25 +557,25 @@ int main(int argc, char **argv) { } do { - prepareForBenchmark(); + prepareForBenchmark("PING"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"PING\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("PING"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("PING (multi bulk)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("PING (multi bulk)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SET"); c = createClient(); if (!c) exit(1); c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize); @@ -557,106 +589,106 @@ int main(int argc, char **argv) { prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("SET"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("GET"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("GET"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("INCR"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n"); prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); - endBenchmark("INCR"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPUSH"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); - endBenchmark("LPUSH"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPOP"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPOP mylist\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LPOP"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SADD"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("SADD"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SPOP"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"SPOP myset\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("SPOP"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPUSH (again, in order to bench LRANGE)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("LPUSH (again, in order to bench LRANGE)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 100 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 100 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 300 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 300 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 450 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 450 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 600 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 600 elements)"); + endBenchmark(); printf("\n"); } while(config.loop); diff --git a/src/redis.c b/src/redis.c index b6b425213b48042c2335bc4649b59ba9ac484c7f..66e088b0068a080b52705719c755b416a72da742 100644 --- a/src/redis.c +++ b/src/redis.c @@ -909,7 +909,7 @@ int processCommand(redisClient *c) { } else if (c->multibulk) { if (c->bulklen == -1) { if (((char*)c->argv[0]->ptr)[0] != '$') { - addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n")); + addReplyError(c,"multi bulk protocol error"); resetClient(c); return 1; } else { @@ -922,7 +922,7 @@ int processCommand(redisClient *c) { bulklen < 0 || bulklen > 1024*1024*1024) { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@ -975,17 +975,14 @@ int processCommand(redisClient *c) { * such wrong arity, bad command name and so forth. */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { - addReplySds(c, - sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n", - (char*)c->argv[0]->ptr)); + addReplyErrorFormat(c,"unknown command '%s'", + (char*)c->argv[0]->ptr); resetClient(c); return 1; } else if ((cmd->arity > 0 && cmd->arity != c->argc) || (c->argc < -cmd->arity)) { - addReplySds(c, - sdscatprintf(sdsempty(), - "-ERR wrong number of arguments for '%s' command\r\n", - cmd->name)); + addReplyErrorFormat(c,"wrong number of arguments for '%s' command", + cmd->name); resetClient(c); return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { @@ -999,7 +996,7 @@ int processCommand(redisClient *c) { bulklen < 0 || bulklen > 1024*1024*1024) { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@ -1026,7 +1023,7 @@ int processCommand(redisClient *c) { /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { - addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + addReplyError(c,"operation not permitted"); resetClient(c); return 1; } @@ -1035,7 +1032,7 @@ int processCommand(redisClient *c) { if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) && zmalloc_used_memory() > server.maxmemory) { - addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); + addReplyError(c,"command not allowed when used memory > 'maxmemory'"); resetClient(c); return 1; } @@ -1045,7 +1042,7 @@ int processCommand(redisClient *c) { && cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { - addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n")); + addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); resetClient(c); return 1; } @@ -1109,7 +1106,7 @@ void authCommand(redisClient *c) { addReply(c,shared.ok); } else { c->authenticated = 0; - addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); + addReplyError(c,"invalid password"); } } diff --git a/src/redis.h b/src/redis.h index 9e27d724854e1788199286f83e9437c81b7df954..1ef5628872b771d0f79e0321658eb110b6c0b806 100644 --- a/src/redis.h +++ b/src/redis.h @@ -47,6 +47,7 @@ #define REDIS_MAX_WRITE_PER_EVENT (1024*64) #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */ #define REDIS_SHARED_INTEGERS 10000 +#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ #define REDIS_WRITEV_THRESHOLD 3 @@ -309,6 +310,11 @@ typedef struct redisClient { list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ + + /* Response buffer */ + int bufpos; + int buflen; + char buf[]; } redisClient; struct saveparam { @@ -588,6 +594,8 @@ void resetClient(redisClient *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); +void *addDeferredMultiBulkLength(redisClient *c); +void setDeferredMultiBulkLength(redisClient *c, void *node, long length); void addReplySds(redisClient *c, sds s); void processInputBuffer(redisClient *c); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); @@ -597,11 +605,23 @@ void addReplyBulkCString(redisClient *c, char *s); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void addReplySds(redisClient *c, sds s); +void addReplyError(redisClient *c, char *err); +void addReplyStatus(redisClient *c, char *status); void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); -void addReplyUlong(redisClient *c, unsigned long ul); +void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); +#ifdef __GNUC__ +void addReplyErrorFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); +void addReplyStatusFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); +#else +void addReplyErrorFormat(redisClient *c, const char *fmt, ...); +void addReplyStatusFormat(redisClient *c, const char *fmt, ...); +#endif + /* List data type */ void listTypeTryConversion(robj *subject, robj *value); void listTypePush(robj *subject, robj *value, int where); diff --git a/src/replication.c b/src/replication.c index c28460885ecf46c0fd3b4832adffcae254d57f8c..8c629006a14b7b02374ef64f8e6adcb72f664846 100644 --- a/src/replication.c +++ b/src/replication.c @@ -179,7 +179,7 @@ void syncCommand(redisClient *c) { /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) { - addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n")); + addReplyError(c,"Can't SYNC while not connected with my master"); return; } @@ -188,7 +188,7 @@ void syncCommand(redisClient *c) { * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (listLength(c->reply) != 0) { - addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n")); + addReplyError(c,"SYNC is invalid with pending input"); return; } @@ -226,7 +226,7 @@ void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); - addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n")); + addReplyError(c,"Unable to perform background save"); return; } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; diff --git a/src/sds.c b/src/sds.c index a0ebb0591ee17ac9c181a5a89db1a920753227b0..2f3ffedcbf1d3ff8915ca033923e3fdbfa7269d2 100644 --- a/src/sds.c +++ b/src/sds.c @@ -33,7 +33,6 @@ #include "sds.h" #include #include -#include #include #include #include "zmalloc.h" @@ -156,8 +155,8 @@ sds sdscpy(sds s, char *t) { return sdscpylen(s, t, strlen(t)); } -sds sdscatprintf(sds s, const char *fmt, ...) { - va_list ap; +sds sdscatvprintf(sds s, const char *fmt, va_list ap) { + va_list cpy; char *buf, *t; size_t buflen = 16; @@ -169,9 +168,8 @@ sds sdscatprintf(sds s, const char *fmt, ...) { if (buf == NULL) return NULL; #endif buf[buflen-2] = '\0'; - va_start(ap, fmt); - vsnprintf(buf, buflen, fmt, ap); - va_end(ap); + va_copy(cpy,ap); + vsnprintf(buf, buflen, fmt, cpy); if (buf[buflen-2] != '\0') { zfree(buf); buflen *= 2; @@ -184,6 +182,15 @@ sds sdscatprintf(sds s, const char *fmt, ...) { return t; } +sds sdscatprintf(sds s, const char *fmt, ...) { + va_list ap; + char *t; + va_start(ap, fmt); + t = sdscatvprintf(s,fmt,ap); + va_end(ap); + return t; +} + sds sdstrim(sds s, const char *cset) { struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); char *start, *end, *sp, *ep; diff --git a/src/sds.h b/src/sds.h index a0e224f5ada8cc805478e8f863e4dc9dad5519c6..ae0f84fb5c4ec239698d09b9e167ccfce6dbc946 100644 --- a/src/sds.h +++ b/src/sds.h @@ -32,6 +32,7 @@ #define __SDS_H #include +#include typedef char *sds; @@ -53,6 +54,7 @@ sds sdscat(sds s, char *t); sds sdscpylen(sds s, char *t, size_t len); sds sdscpy(sds s, char *t); +sds sdscatvprintf(sds s, const char *fmt, va_list ap); #ifdef __GNUC__ sds sdscatprintf(sds s, const char *fmt, ...) __attribute__((format(printf, 2, 3))); diff --git a/src/sort.c b/src/sort.c index aa1ce929399dc64bfdbcccda95f1db3ae1b2c9c6..79f7901054c0971e6c52addfbb126bd53afb1854 100644 --- a/src/sort.c +++ b/src/sort.c @@ -307,7 +307,7 @@ void sortCommand(redisClient *c) { outputlen = getop ? getop*(end-start+1) : end-start+1; if (storekey == NULL) { /* STORE option not specified, sent the sorting result to client */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen)); + addReplyMultiBulkLen(c,outputlen); for (j = start; j <= end; j++) { listNode *ln; listIter li; @@ -369,7 +369,7 @@ void sortCommand(redisClient *c) { * replaced. */ server.dirty += 1+outputlen; touchWatchedKey(c->db,storekey); - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen)); + addReplyLongLong(c,outputlen); } /* Cleanup */ diff --git a/src/t_hash.c b/src/t_hash.c index b6be284fa12d58bce12ccc8808c7a481f20453e2..5cef1cabbcd86addfd6133fcbd2c6261e2750ae5 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -249,7 +249,7 @@ void hmsetCommand(redisClient *c) { robj *o; if ((c->argc % 2) == 1) { - addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n")); + addReplyError(c,"wrong number of arguments for HMSET"); return; } @@ -315,7 +315,7 @@ void hmgetCommand(redisClient *c) { /* Note the check for o != NULL happens inside the loop. This is * done because objects that cannot be found are considered to be * an empty hash. The reply should then be a series of NULLs. */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-2)); + addReplyMultiBulkLen(c,c->argc-2); for (i = 2; i < c->argc; i++) { if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) { addReplyBulk(c,value); @@ -346,21 +346,19 @@ void hlenCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - addReplyUlong(c,hashTypeLength(o)); + addReplyLongLong(c,hashTypeLength(o)); } void genericHgetallCommand(redisClient *c, int flags) { - robj *o, *lenobj, *obj; + robj *o, *obj; unsigned long count = 0; hashTypeIterator *hi; + void *replylen = NULL; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_HASH)) return; - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); - + replylen = addDeferredMultiBulkLength(c); hi = hashTypeInitIterator(o); while (hashTypeNext(hi) != REDIS_ERR) { if (flags & REDIS_HASH_KEY) { @@ -377,8 +375,7 @@ void genericHgetallCommand(redisClient *c, int flags) { } } hashTypeReleaseIterator(hi); - - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count); + setDeferredMultiBulkLength(c,replylen,count); } void hkeysCommand(redisClient *c) { diff --git a/src/t_list.c b/src/t_list.c index add1bee167691b1fcc382c1292c728832f6c65bd..41d651f64543d144bc528f312adf2f61d63f7372 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -342,7 +342,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) { server.dirty++; } - addReplyUlong(c,listTypeLength(subject)); + addReplyLongLong(c,listTypeLength(subject)); } void lpushxCommand(redisClient *c) { @@ -366,7 +366,7 @@ void linsertCommand(redisClient *c) { void llenCommand(redisClient *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); if (o == NULL || checkType(c,o,REDIS_LIST)) return; - addReplyUlong(c,listTypeLength(o)); + addReplyLongLong(c,listTypeLength(o)); } void lindexCommand(redisClient *c) { @@ -494,7 +494,7 @@ void lrangeCommand(redisClient *c) { rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); + addReplyMultiBulkLen(c,rangelen); listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); for (j = 0; j < rangelen; j++) { redisAssert(listTypeNext(li,&entry)); @@ -594,7 +594,7 @@ void lremCommand(redisClient *c) { decrRefCount(obj); if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); + addReplyLongLong(c,removed); if (removed) touchWatchedKey(c->db,c->argv[1]); } @@ -772,7 +772,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - addReplySds(receiver,sdsnew("*2\r\n")); + addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); unblockClientWaitingData(receiver); @@ -792,7 +792,7 @@ void blockingPopGenericCommand(redisClient *c, int where) { /* Make sure the timeout is not negative */ if (lltimeout < 0) { - addReplySds(c,sdsnew("-ERR timeout is negative\r\n")); + addReplyError(c,"timeout is negative"); return; } @@ -822,7 +822,7 @@ void blockingPopGenericCommand(redisClient *c, int where) { * "real" command will add the last element (the value) * for us. If this souds like an hack to you it's just * because it is... */ - addReplySds(c,sdsnew("*2\r\n")); + addReplyMultiBulkLen(c,2); addReplyBulk(c,argv[1]); popGenericCommand(c,where); diff --git a/src/t_set.c b/src/t_set.c index 68e132278f96c938e605490e6ed55aff46177ef8..e2ac5ae5378155f329f76d90d7f5c6680e959f05 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -276,7 +276,7 @@ void scardCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - addReplyUlong(c,setTypeSize(o)); + addReplyLongLong(c,setTypeSize(o)); } void spopCommand(redisClient *c) { @@ -320,7 +320,8 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; - robj *ele, *lenobj = NULL, *dstset = NULL; + robj *ele, *dstset = NULL; + void *replylen = NULL; unsigned long j, cardinality = 0; for (j = 0; j < setnum; j++) { @@ -356,9 +357,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * to the output list and save the pointer to later modify it with the * right length */ if (!dstkey) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); + replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ @@ -400,7 +399,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, touchWatchedKey(c->db,dstkey); server.dirty++; } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); + setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); } @@ -470,7 +469,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); + addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); while((ele = setTypeNext(si)) != NULL) { addReplyBulk(c,ele); diff --git a/src/t_string.c b/src/t_string.c index 3b8a39bbec7100d20622411c2376eda49565a2fa..509c630a49540163980ef3aba678a6d9efbb9623 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -12,7 +12,7 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK) return; if (seconds <= 0) { - addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n")); + addReplyError(c,"invalid expire time in SETEX"); return; } } @@ -79,7 +79,7 @@ void getsetCommand(redisClient *c) { void mgetCommand(redisClient *c) { int j; - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); + addReplyMultiBulkLen(c,c->argc-1); for (j = 1; j < c->argc; j++) { robj *o = lookupKeyRead(c->db,c->argv[j]); if (o == NULL) { @@ -98,7 +98,7 @@ void msetGenericCommand(redisClient *c, int nx) { int j, busykeys = 0; if ((c->argc % 2) == 0) { - addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n")); + addReplyError(c,"wrong number of arguments for MSET"); return; } /* Handle the NX flag. The MSETNX semantic is to return zero and don't @@ -211,7 +211,7 @@ void appendCommand(redisClient *c) { } touchWatchedKey(c->db,c->argv[1]); server.dirty++; - addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",(unsigned long)totlen)); + addReplyLongLong(c,totlen); } void substrCommand(redisClient *c) { diff --git a/src/t_zset.c b/src/t_zset.c index e93e5c406a5cb2088b134aeea7cbc249995ac83d..d944e92329fca9d8f8f0033216dbf7f0ae631706 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -355,8 +355,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, i *score = scoreval; } if (isnan(*score)) { - addReplySds(c, - sdsnew("-ERR resulting score is not a number (NaN)\r\n")); + addReplyError(c,"resulting score is not a number (NaN)"); zfree(score); /* Note that we don't need to check if the zset may be empty and * should be removed here, as we can only obtain Nan as score if @@ -561,7 +560,8 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* expect setnum input keys to be given */ setnum = atoi(c->argv[2]->ptr); if (setnum < 1) { - addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); + addReplyError(c, + "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); return; } @@ -782,8 +782,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { } /* Return the result in form of a multi-bulk reply */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n", - withscores ? (rangelen*2) : rangelen)); + addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen); for (j = 0; j < rangelen; j++) { ele = ln->obj; addReplyBulk(c,ele); @@ -840,8 +839,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { if (c->argc != (4 + withscores) && c->argc != (7 + withscores)) badsyntax = 1; if (badsyntax) { - addReplySds(c, - sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n")); + addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE"); return; } @@ -866,7 +864,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { zset *zsetobj = o->ptr; zskiplist *zsl = zsetobj->zsl; zskiplistNode *ln; - robj *ele, *lenobj = NULL; + robj *ele; + void *replylen = NULL; unsigned long rangelen = 0; /* Get the first node with the score >= min, or with @@ -884,11 +883,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { * are in the list, so we push this object that will represent * the multi-bulk length in the output buffer, and will "fix" * it later */ - if (!justcount) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); - } + if (!justcount) + replylen = addDeferredMultiBulkLength(c); while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) { if (offset) { @@ -910,7 +906,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) { if (justcount) { addReplyLongLong(c,(long)rangelen); } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", + setDeferredMultiBulkLength(c,replylen, withscores ? (rangelen*2) : rangelen); } } @@ -933,7 +929,7 @@ void zcardCommand(redisClient *c) { checkType(c,o,REDIS_ZSET)) return; zs = o->ptr; - addReplyUlong(c,zs->zsl->length); + addReplyLongLong(c,zs->zsl->length); } void zscoreCommand(redisClient *c) {