提交 89f9f837 编写于 作者: A antirez

Merge remote branch 'pietern/networking-perf'

......@@ -189,6 +189,7 @@ struct redisClient *createFakeClient(void) {
c->querybuf = sdsempty();
c->argc = 0;
c->argv = NULL;
c->bufpos = 0;
c->flags = 0;
/* We set the fake client as a slave waiting for the synchronization
* so that Redis will not try to send replies to this client. */
......@@ -272,12 +273,14 @@ int loadAppendOnlyFile(char *filename) {
fakeClient->argc = argc;
fakeClient->argv = argv;
cmd->proc(fakeClient);
/* Discard the reply objects list from the fake client */
while(listLength(fakeClient->reply))
listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
/* The fake client should not have a reply */
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* Clean up, ready for the next command */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
/* Handle swapping while loading big datasets when VM is on */
force_swapout = 0;
if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
......@@ -629,12 +632,11 @@ int rewriteAppendOnlyFileBackground(void) {
void bgrewriteaofCommand(redisClient *c) {
if (server.bgrewritechildpid != -1) {
addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
addReplyError(c,"Background append only file rewriting already in progress");
return;
}
if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
char *status = "+Background append only file rewriting started\r\n";
addReplySds(c,sdsnew(status));
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReply(c,shared.err);
}
......
......@@ -270,8 +270,8 @@ void configSetCommand(redisClient *c) {
stopAppendOnly();
} else {
if (startAppendOnly() == REDIS_ERR) {
addReplySds(c,sdscatprintf(sdsempty(),
"-ERR Unable to turn on AOF. Check server logs.\r\n"));
addReplyError(c,
"Unable to turn on AOF. Check server logs.");
decrRefCount(o);
return;
}
......@@ -312,9 +312,8 @@ void configSetCommand(redisClient *c) {
}
sdsfreesplitres(v,vlen);
} else {
addReplySds(c,sdscatprintf(sdsempty(),
"-ERR not supported CONFIG parameter %s\r\n",
(char*)c->argv[2]->ptr));
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
(char*)c->argv[2]->ptr);
decrRefCount(o);
return;
}
......@@ -323,22 +322,18 @@ void configSetCommand(redisClient *c) {
return;
badfmt: /* Bad format errors */
addReplySds(c,sdscatprintf(sdsempty(),
"-ERR invalid argument '%s' for CONFIG SET '%s'\r\n",
addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'",
(char*)o->ptr,
(char*)c->argv[2]->ptr));
(char*)c->argv[2]->ptr);
decrRefCount(o);
}
void configGetCommand(redisClient *c) {
robj *o = getDecodedObject(c->argv[2]);
robj *lenobj = createObject(REDIS_STRING,NULL);
void *replylen = addDeferredMultiBulkLength(c);
char *pattern = o->ptr;
int matches = 0;
addReply(c,lenobj);
decrRefCount(lenobj);
if (stringmatch(pattern,"dbfilename",0)) {
addReplyBulkCString(c,"dbfilename");
addReplyBulkCString(c,server.dbfilename);
......@@ -410,7 +405,7 @@ void configGetCommand(redisClient *c) {
matches++;
}
decrRefCount(o);
lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
setDeferredMultiBulkLength(c,replylen,matches*2);
}
void configCommand(redisClient *c) {
......@@ -428,13 +423,12 @@ void configCommand(redisClient *c) {
server.stat_starttime = time(NULL);
addReply(c,shared.ok);
} else {
addReplySds(c,sdscatprintf(sdsempty(),
"-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
addReplyError(c,
"CONFIG subcommand must be one of GET, SET, RESETSTAT");
}
return;
badarity:
addReplySds(c,sdscatprintf(sdsempty(),
"-ERR Wrong number of arguments for CONFIG %s\r\n",
(char*) c->argv[1]->ptr));
addReplyErrorFormat(c,"Wrong number of arguments for CONFIG %s",
(char*) c->argv[1]->ptr);
}
......@@ -204,7 +204,7 @@ void selectCommand(redisClient *c) {
int id = atoi(c->argv[1]->ptr);
if (selectDb(c,id) == REDIS_ERR) {
addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
addReplyError(c,"invalid DB index");
} else {
addReply(c,shared.ok);
}
......@@ -228,11 +228,9 @@ void keysCommand(redisClient *c) {
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
robj *lenobj = createObject(REDIS_STRING,NULL);
void *replylen = addDeferredMultiBulkLength(c);
di = dictGetIterator(c->db->dict);
addReply(c,lenobj);
decrRefCount(lenobj);
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
while((de = dictNext(di)) != NULL) {
sds key = dictGetEntryKey(de);
......@@ -248,17 +246,15 @@ void keysCommand(redisClient *c) {
}
}
dictReleaseIterator(di);
lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",numkeys);
setDeferredMultiBulkLength(c,replylen,numkeys);
}
void dbsizeCommand(redisClient *c) {
addReplySds(c,
sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
addReplyLongLong(c,dictSize(c->db->dict));
}
void lastsaveCommand(redisClient *c) {
addReplySds(c,
sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
addReplyLongLong(c,server.lastsave);
}
void typeCommand(redisClient *c) {
......@@ -267,24 +263,23 @@ void typeCommand(redisClient *c) {
o = lookupKeyRead(c->db,c->argv[1]);
if (o == NULL) {
type = "+none";
type = "none";
} else {
switch(o->type) {
case REDIS_STRING: type = "+string"; break;
case REDIS_LIST: type = "+list"; break;
case REDIS_SET: type = "+set"; break;
case REDIS_ZSET: type = "+zset"; break;
case REDIS_HASH: type = "+hash"; break;
default: type = "+unknown"; break;
case REDIS_STRING: type = "string"; break;
case REDIS_LIST: type = "list"; break;
case REDIS_SET: type = "set"; break;
case REDIS_ZSET: type = "zset"; break;
case REDIS_HASH: type = "hash"; break;
default: type = "unknown"; break;
}
}
addReplySds(c,sdsnew(type));
addReply(c,shared.crlf);
addReplyStatus(c,type);
}
void saveCommand(redisClient *c) {
if (server.bgsavechildpid != -1) {
addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
addReplyError(c,"Background save already in progress");
return;
}
if (rdbSave(server.dbfilename) == REDIS_OK) {
......@@ -296,12 +291,11 @@ void saveCommand(redisClient *c) {
void bgsaveCommand(redisClient *c) {
if (server.bgsavechildpid != -1) {
addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
addReplyError(c,"Background save already in progress");
return;
}
if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
char *status = "+Background saving started\r\n";
addReplySds(c,sdsnew(status));
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
}
......@@ -310,7 +304,7 @@ void bgsaveCommand(redisClient *c) {
void shutdownCommand(redisClient *c) {
if (prepareForShutdown() == REDIS_OK)
exit(0);
addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n"));
addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
}
void renameGenericCommand(redisClient *c, int nx) {
......
......@@ -211,18 +211,18 @@ void debugCommand(redisClient *c) {
char *strenc;
strenc = strEncoding(val->encoding);
addReplySds(c,sdscatprintf(sdsempty(),
"+Value at:%p refcount:%d "
"encoding:%s serializedlength:%lld\r\n",
addReplyStatusFormat(c,
"Value at:%p refcount:%d "
"encoding:%s serializedlength:%lld",
(void*)val, val->refcount,
strenc, (long long) rdbSavedObjectLen(val,NULL)));
strenc, (long long) rdbSavedObjectLen(val,NULL));
} else {
vmpointer *vp = (vmpointer*) val;
addReplySds(c,sdscatprintf(sdsempty(),
"+Value swapped at: page %llu "
"using %llu pages\r\n",
addReplyStatusFormat(c,
"Value swapped at: page %llu "
"using %llu pages",
(unsigned long long) vp->page,
(unsigned long long) vp->usedpages));
(unsigned long long) vp->usedpages);
}
} else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
lookupKeyRead(c->db,c->argv[2]);
......@@ -233,7 +233,7 @@ void debugCommand(redisClient *c) {
vmpointer *vp;
if (!server.vm_enabled) {
addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n"));
addReplyError(c,"Virtual Memory is disabled");
return;
}
if (!de) {
......@@ -243,9 +243,9 @@ void debugCommand(redisClient *c) {
val = dictGetEntryVal(de);
/* Swap it */
if (val->storage != REDIS_VM_MEMORY) {
addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
addReplyError(c,"This key is not in memory");
} else if (val->refcount != 1) {
addReplySds(c,sdsnew("-ERR Object is shared\r\n"));
addReplyError(c,"Object is shared");
} else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
dictGetEntryVal(de) = vp;
addReply(c,shared.ok);
......@@ -274,18 +274,17 @@ void debugCommand(redisClient *c) {
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
unsigned char digest[20];
sds d = sdsnew("+");
sds d = sdsempty();
int j;
computeDatasetDigest(digest);
for (j = 0; j < 20; j++)
d = sdscatprintf(d, "%02x",digest[j]);
d = sdscatlen(d,"\r\n",2);
addReplySds(c,d);
addReplyStatus(c,d);
sdsfree(d);
} else {
addReplySds(c,sdsnew(
"-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]\r\n"));
addReplyError(c,
"Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]");
}
}
......
......@@ -42,7 +42,7 @@ void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
void multiCommand(redisClient *c) {
if (c->flags & REDIS_MULTI) {
addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n"));
addReplyError(c,"MULTI calls can not be nested");
return;
}
c->flags |= REDIS_MULTI;
......@@ -51,7 +51,7 @@ void multiCommand(redisClient *c) {
void discardCommand(redisClient *c) {
if (!(c->flags & REDIS_MULTI)) {
addReplySds(c,sdsnew("-ERR DISCARD without MULTI\r\n"));
addReplyError(c,"DISCARD without MULTI");
return;
}
......@@ -82,7 +82,7 @@ void execCommand(redisClient *c) {
int orig_argc;
if (!(c->flags & REDIS_MULTI)) {
addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
addReplyError(c,"EXEC without MULTI");
return;
}
......@@ -107,7 +107,7 @@ void execCommand(redisClient *c) {
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
orig_argc = c->argc;
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
addReplyMultiBulkLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
......@@ -251,7 +251,7 @@ void watchCommand(redisClient *c) {
int j;
if (c->flags & REDIS_MULTI) {
addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n"));
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
for (j = 1; j < c->argc; j++)
......
#include "redis.h"
#include <sys/uio.h>
void *dupClientReplyValue(void *o) {
......@@ -12,7 +11,12 @@ int listMatchObjects(void *a, void *b) {
}
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(*c));
redisClient *c;
/* Allocate more space to hold a static write buffer. */
c = zmalloc(sizeof(redisClient)+REDIS_REPLY_CHUNK_BYTES);
c->buflen = REDIS_REPLY_CHUNK_BYTES;
c->bufpos = 0;
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
......@@ -56,70 +60,238 @@ redisClient *createClient(int fd) {
return c;
}
void addReply(redisClient *c, robj *obj) {
if (listLength(c->reply) == 0 &&
int _ensureFileEvent(redisClient *c) {
if (c->fd <= 0) return REDIS_ERR;
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return;
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
/* Create a duplicate of the last object in the reply list when
* it is not exclusively owned by the reply list. */
robj *dupLastObjectIfNeeded(list *reply) {
robj *new, *cur;
listNode *ln;
redisAssert(listLength(reply) > 0);
ln = listLast(reply);
cur = listNodeValue(ln);
if (cur->refcount > 1) {
new = dupStringObject(cur);
decrRefCount(cur);
listNodeValue(ln) = new;
}
return listNodeValue(ln);
}
int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = c->buflen-c->bufpos;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return REDIS_ERR;
/* Check that the buffer has enough space available for this string. */
if (len > available) return REDIS_ERR;
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return REDIS_OK;
}
void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
if (listLength(c->reply) == 0) {
incrRefCount(o);
listAddNodeTail(c->reply,o);
} else {
tail = listNodeValue(listLast(c->reply));
/* Append to this object when possible. */
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
} else {
incrRefCount(o);
listAddNodeTail(c->reply,o);
}
}
}
/* This method takes responsibility over the sds. When it is no longer
* needed it will be free'd, otherwise it ends up in a robj. */
void _addReplySdsToList(redisClient *c, sds s) {
robj *tail;
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
} else {
tail = listNodeValue(listLast(c->reply));
/* Append to this object when possible. */
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
{
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
sdsfree(s);
} else {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
}
}
}
void _addReplyStringToList(redisClient *c, char *s, size_t len) {
robj *tail;
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createStringObject(s,len));
} else {
tail = listNodeValue(listLast(c->reply));
/* Append to this object when possible. */
if (tail->ptr != NULL &&
sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
{
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,len);
} else {
listAddNodeTail(c->reply,createStringObject(s,len));
}
}
}
void addReply(redisClient *c, robj *obj) {
if (_ensureFileEvent(c) != REDIS_OK) return;
if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
/* Returns a new object with refcount 1 */
obj = dupStringObject(obj);
obj->refcount = 0; /* getDecodedObject() will increment the refcount */
} else {
/* This increments the refcount. */
obj = getDecodedObject(obj);
}
listAddNodeTail(c->reply,getDecodedObject(obj));
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
}
void addReplySds(redisClient *c, sds s) {
robj *o = createObject(REDIS_STRING,s);
addReply(c,o);
decrRefCount(o);
if (_ensureFileEvent(c) != REDIS_OK) {
/* The caller expects the sds to be free'd. */
sdsfree(s);
return;
}
if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
sdsfree(s);
} else {
/* This method free's the sds when it is no longer needed. */
_addReplySdsToList(c,s);
}
}
void addReplyDouble(redisClient *c, double d) {
char buf[128];
void addReplyString(redisClient *c, char *s, size_t len) {
if (_ensureFileEvent(c) != REDIS_OK) return;
if (_addReplyToBuffer(c,s,len) != REDIS_OK)
_addReplyStringToList(c,s,len);
}
snprintf(buf,sizeof(buf),"%.17g",d);
addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
(unsigned long) strlen(buf),buf));
void _addReplyError(redisClient *c, char *s, size_t len) {
addReplyString(c,"-ERR ",5);
addReplyString(c,s,len);
addReplyString(c,"\r\n",2);
}
void addReplyLongLong(redisClient *c, long long ll) {
char buf[128];
size_t len;
void addReplyError(redisClient *c, char *err) {
_addReplyError(c,err,strlen(err));
}
if (ll == 0) {
addReply(c,shared.czero);
return;
} else if (ll == 1) {
addReply(c,shared.cone);
return;
void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
_addReplyError(c,s,sdslen(s));
sdsfree(s);
}
void _addReplyStatus(redisClient *c, char *s, size_t len) {
addReplyString(c,"+",1);
addReplyString(c,s,len);
addReplyString(c,"\r\n",2);
}
void addReplyStatus(redisClient *c, char *status) {
_addReplyStatus(c,status,strlen(status));
}
void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
_addReplyStatus(c,s,sdslen(s));
sdsfree(s);
}
/* Adds an empty object to the reply list that will contain the multi bulk
* length, which is not known when this function is called. */
void *addDeferredMultiBulkLength(redisClient *c) {
if (_ensureFileEvent(c) != REDIS_OK) return NULL;
listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
return listLast(c->reply);
}
/* Populate the length object and try glueing it to the next chunk. */
void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
listNode *ln = (listNode*)node;
robj *len, *next;
/* Abort when *node is NULL (see addDeferredMultiBulkLength). */
if (node == NULL) return;
len = listNodeValue(ln);
len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
if (ln->next != NULL) {
next = listNodeValue(ln->next);
/* Only glue when the next node is non-NULL (an sds in this case) */
if (next->ptr != NULL) {
len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
listDelNode(c->reply,ln->next);
}
}
buf[0] = ':';
}
void addReplyDouble(redisClient *c, double d) {
char dbuf[128], sbuf[128];
int dlen, slen;
dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
addReplyString(c,sbuf,slen);
}
void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
char buf[128];
int len;
buf[0] = prefix;
len = ll2string(buf+1,sizeof(buf)-1,ll);
buf[len+1] = '\r';
buf[len+2] = '\n';
addReplySds(c,sdsnewlen(buf,len+3));
addReplyString(c,buf,len+3);
}
void addReplyUlong(redisClient *c, unsigned long ul) {
char buf[128];
size_t len;
void addReplyLongLong(redisClient *c, long long ll) {
_addReplyLongLong(c,ll,':');
}
if (ul == 0) {
addReply(c,shared.czero);
return;
} else if (ul == 1) {
addReply(c,shared.cone);
return;
}
len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
addReplySds(c,sdsnewlen(buf,len));
void addReplyMultiBulkLen(redisClient *c, long length) {
_addReplyLongLong(c,length,'*');
}
void addReplyBulkLen(redisClient *c, robj *obj) {
size_t len, intlen;
char buf[128];
size_t len;
if (obj->encoding == REDIS_ENCODING_RAW) {
len = sdslen(obj->ptr);
......@@ -136,11 +308,7 @@ void addReplyBulkLen(redisClient *c, robj *obj) {
len++;
}
}
buf[0] = '$';
intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
buf[intlen+1] = '\r';
buf[intlen+2] = '\n';
addReplySds(c,sdsnewlen(buf,intlen+3));
_addReplyLongLong(c,len,'$');
}
void addReplyBulk(redisClient *c, robj *obj) {
......@@ -290,34 +458,6 @@ void freeClient(redisClient *c) {
zfree(c);
}
#define GLUEREPLY_UP_TO (1024)
static void glueReplyBuffersIfNeeded(redisClient *c) {
int copylen = 0;
char buf[GLUEREPLY_UP_TO];
listNode *ln;
listIter li;
robj *o;
listRewind(c->reply,&li);
while((ln = listNext(&li))) {
int objlen;
o = ln->value;
objlen = sdslen(o->ptr);
if (copylen + objlen <= GLUEREPLY_UP_TO) {
memcpy(buf+copylen,o->ptr,objlen);
copylen += objlen;
listDelNode(c->reply,ln);
} else {
if (copylen == 0) return;
break;
}
}
/* Now the output buffer is empty, add the new single element */
o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
listAddNodeHead(c->reply,o);
}
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
......@@ -334,31 +474,48 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
while(listLength(c->reply)) {
if (server.glueoutputbuf && listLength(c->reply) > 1)
glueReplyBuffersIfNeeded(c);
while(c->bufpos > 0 || listLength(c->reply)) {
if (c->bufpos > 0) {
if (c->flags & REDIS_MASTER) {
/* Don't reply to a master */
nwritten = c->bufpos - c->sentlen;
} else {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
}
c->sentlen += nwritten;
totwritten += nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if (c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
continue;
}
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
continue;
}
if (c->flags & REDIS_MASTER) {
/* Don't reply to a master */
nwritten = objlen - c->sentlen;
} else {
nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
if (nwritten <= 0) break;
}
c->sentlen += nwritten;
totwritten += nwritten;
if (c->flags & REDIS_MASTER) {
/* Don't reply to a master */
nwritten = objlen - c->sentlen;
} else {
nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
}
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
}
}
/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
......
......@@ -354,9 +354,9 @@ int getDoubleFromObjectOrReply(redisClient *c, robj *o, double *target, const ch
double value;
if (getDoubleFromObject(o, &value) != REDIS_OK) {
if (msg != NULL) {
addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
addReplyError(c,(char*)msg);
} else {
addReplySds(c, sdsnew("-ERR value is not a double\r\n"));
addReplyError(c,"value is not a double");
}
return REDIS_ERR;
}
......@@ -393,9 +393,9 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con
long long value;
if (getLongLongFromObject(o, &value) != REDIS_OK) {
if (msg != NULL) {
addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
addReplyError(c,(char*)msg);
} else {
addReplySds(c, sdsnew("-ERR value is not an integer or out of range\r\n"));
addReplyError(c,"value is not an integer or out of range");
}
return REDIS_ERR;
}
......@@ -410,9 +410,9 @@ int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *
if (getLongLongFromObjectOrReply(c, o, &value, msg) != REDIS_OK) return REDIS_ERR;
if (value < LONG_MIN || value > LONG_MAX) {
if (msg != NULL) {
addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
addReplyError(c,(char*)msg);
} else {
addReplySds(c, sdsnew("-ERR value is out of range\r\n"));
addReplyError(c,"value is out of range");
}
return REDIS_ERR;
}
......
......@@ -75,6 +75,7 @@ static struct config {
long long start;
long long totlatency;
int *latency;
char *title;
list *clients;
int quiet;
int loop;
......@@ -206,16 +207,27 @@ static void clientDone(client c) {
}
}
/* Read a length from the buffer pointed to by *p, store the length in *len,
* and return the number of bytes that the cursor advanced. */
static int readLen(char *p, int *len) {
char *tail = strstr(p,"\r\n");
if (tail == NULL)
return 0;
*tail = '\0';
*len = atoi(p+1);
return tail+2-p;
}
static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask)
{
char buf[1024];
int nread;
char buf[1024], *p;
int nread, pos=0, len=0;
client c = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(fd);
REDIS_NOTUSED(mask);
nread = read(c->fd, buf, 1024);
nread = read(c->fd,buf,sizeof(buf));
if (nread == -1) {
fprintf(stderr, "Reading from socket: %s\n", strerror(errno));
freeClient(c);
......@@ -228,82 +240,89 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask)
}
c->totreceived += nread;
c->ibuf = sdscatlen(c->ibuf,buf,nread);
len = sdslen(c->ibuf);
processdata:
/* Are we waiting for the first line of the command of for sdf
* count in bulk or multi bulk operations? */
if (c->replytype == REPLY_INT ||
c->replytype == REPLY_RETCODE ||
(c->replytype == REPLY_BULK && c->readlen == -1) ||
(c->replytype == REPLY_MBULK && c->readlen == -1) ||
(c->replytype == REPLY_MBULK && c->mbulk == -1)) {
char *p;
/* Check if the first line is complete. This is only true if
* there is a newline inside the buffer. */
if ((p = strchr(c->ibuf,'\n')) != NULL) {
if (c->replytype == REPLY_BULK ||
(c->replytype == REPLY_MBULK && c->mbulk != -1))
{
/* Read the count of a bulk reply (being it a single bulk or
* a multi bulk reply). "$<count>" for the protocol spec. */
*p = '\0';
*(p-1) = '\0';
c->readlen = atoi(c->ibuf+1)+2;
// printf("BULK ATOI: %s\n", c->ibuf+1);
/* Handle null bulk reply "$-1" */
if (c->readlen-2 == -1) {
clientDone(c);
return;
}
/* Leave all the rest in the input buffer */
c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
/* fall through to reach the point where the code will try
* to check if the bulk reply is complete. */
} else if (c->replytype == REPLY_MBULK && c->mbulk == -1) {
/* Read the count of a multi bulk reply. That is, how many
* bulk replies we have to read next. "*<count>" protocol. */
*p = '\0';
*(p-1) = '\0';
c->mbulk = atoi(c->ibuf+1);
/* Handle null bulk reply "*-1" */
if (c->mbulk == -1) {
clientDone(c);
return;
c->replytype == REPLY_RETCODE)
{
/* Check if the first line is complete. This is everything we need
* when waiting for an integer or status code reply.*/
if ((p = strstr(c->ibuf,"\r\n")) != NULL)
goto done;
} else if (c->replytype == REPLY_BULK) {
int advance = 0;
if (c->readlen < 0) {
advance = readLen(c->ibuf+pos,&c->readlen);
if (advance) {
pos += advance;
if (c->readlen == -1) {
goto done;
} else {
/* include the trailing \r\n */
c->readlen += 2;
}
// printf("%p) %d elements list\n", c, c->mbulk);
/* Leave all the rest in the input buffer */
c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
goto processdata;
} else {
c->ibuf = sdstrim(c->ibuf,"\r\n");
clientDone(c);
return;
goto skip;
}
}
}
/* bulk read, did we read everything? */
if (((c->replytype == REPLY_MBULK && c->mbulk != -1) ||
(c->replytype == REPLY_BULK)) && c->readlen != -1 &&
(unsigned)c->readlen <= sdslen(c->ibuf))
{
// printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n",
// c->mbulk,c->readlen,sdslen(c->ibuf));
if (c->replytype == REPLY_BULK) {
clientDone(c);
} else if (c->replytype == REPLY_MBULK) {
// printf("%p) %d (%d)) ",c, c->mbulk, c->readlen);
// fwrite(c->ibuf,c->readlen,1,stdout);
// printf("\n");
if (--c->mbulk == 0) {
clientDone(c);
int canconsume;
if (c->readlen > 0) {
canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
c->readlen -= canconsume;
pos += canconsume;
}
if (c->readlen == 0)
goto done;
} else if (c->replytype == REPLY_MBULK) {
int advance = 0;
if (c->mbulk == -1) {
advance = readLen(c->ibuf+pos,&c->mbulk);
if (advance) {
pos += advance;
if (c->mbulk == -1)
goto done;
} else {
goto skip;
}
}
int canconsume;
while(c->mbulk > 0 && pos < len) {
if (c->readlen > 0) {
canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
c->readlen -= canconsume;
pos += canconsume;
if (c->readlen == 0)
c->mbulk--;
} else {
c->ibuf = sdsrange(c->ibuf,c->readlen,-1);
c->readlen = -1;
goto processdata;
advance = readLen(c->ibuf+pos,&c->readlen);
if (advance) {
pos += advance;
if (c->readlen == -1) {
c->mbulk--;
continue;
} else {
/* include the trailing \r\n */
c->readlen += 2;
}
} else {
goto skip;
}
}
}
if (c->mbulk == 0)
goto done;
}
skip:
c->ibuf = sdsrange(c->ibuf,pos,-1);
return;
done:
clientDone(c);
return;
}
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask)
......@@ -371,13 +390,13 @@ static void createMissingClients(client c) {
}
}
static void showLatencyReport(char *title) {
static void showLatencyReport(void) {
int j, seen = 0;
float perc, reqpersec;
reqpersec = (float)config.donerequests/((float)config.totlatency/1000);
if (!config.quiet) {
printf("====== %s ======\n", title);
printf("====== %s ======\n", config.title);
printf(" %d requests completed in %.2f seconds\n", config.donerequests,
(float)config.totlatency/1000);
printf(" %d parallel clients\n", config.numclients);
......@@ -393,20 +412,20 @@ static void showLatencyReport(char *title) {
}
printf("%.2f requests per second\n\n", reqpersec);
} else {
printf("%s: %.2f requests per second\n", title, reqpersec);
printf("%s: %.2f requests per second\n", config.title, reqpersec);
}
}
static void prepareForBenchmark(void)
{
static void prepareForBenchmark(char *title) {
memset(config.latency,0,sizeof(int)*(MAX_LATENCY+1));
config.title = title;
config.start = mstime();
config.donerequests = 0;
}
static void endBenchmark(char *title) {
static void endBenchmark(void) {
config.totlatency = mstime()-config.start;
showLatencyReport(title);
showLatencyReport();
freeAllClients();
}
......@@ -480,6 +499,18 @@ void parseOptions(int argc, char **argv) {
}
}
int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
float dt = (float)(mstime()-config.start)/1000.0;
float rps = (float)config.donerequests/dt;
printf("%s: %.2f\r", config.title, rps);
fflush(stdout);
return 250; /* every 250ms */
}
int main(int argc, char **argv) {
client c;
......@@ -491,6 +522,7 @@ int main(int argc, char **argv) {
config.requests = 10000;
config.liveclients = 0;
config.el = aeCreateEventLoop();
aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
config.keepalive = 1;
config.donerequests = 0;
config.datasize = 3;
......@@ -514,7 +546,7 @@ int main(int argc, char **argv) {
if (config.idlemode) {
printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
prepareForBenchmark();
prepareForBenchmark("IDLE");
c = createClient();
if (!c) exit(1);
c->obuf = sdsempty();
......@@ -525,25 +557,25 @@ int main(int argc, char **argv) {
}
do {
prepareForBenchmark();
prepareForBenchmark("PING");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"PING\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
endBenchmark("PING");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("PING (multi bulk)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
endBenchmark("PING (multi bulk)");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("SET");
c = createClient();
if (!c) exit(1);
c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize);
......@@ -557,106 +589,106 @@ int main(int argc, char **argv) {
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
endBenchmark("SET");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("GET");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n");
prepareClientForReply(c,REPLY_BULK);
createMissingClients(c);
aeMain(config.el);
endBenchmark("GET");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("INCR");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n");
prepareClientForReply(c,REPLY_INT);
createMissingClients(c);
aeMain(config.el);
endBenchmark("INCR");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("LPUSH");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
prepareClientForReply(c,REPLY_INT);
createMissingClients(c);
aeMain(config.el);
endBenchmark("LPUSH");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("LPOP");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LPOP mylist\r\n");
prepareClientForReply(c,REPLY_BULK);
createMissingClients(c);
aeMain(config.el);
endBenchmark("LPOP");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("SADD");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
endBenchmark("SADD");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("SPOP");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"SPOP myset\r\n");
prepareClientForReply(c,REPLY_BULK);
createMissingClients(c);
aeMain(config.el);
endBenchmark("SPOP");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("LPUSH (again, in order to bench LRANGE)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
endBenchmark("LPUSH (again, in order to bench LRANGE)");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("LRANGE (first 100 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
endBenchmark("LRANGE (first 100 elements)");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("LRANGE (first 300 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
endBenchmark("LRANGE (first 300 elements)");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("LRANGE (first 450 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
endBenchmark("LRANGE (first 450 elements)");
endBenchmark();
prepareForBenchmark();
prepareForBenchmark("LRANGE (first 600 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
endBenchmark("LRANGE (first 600 elements)");
endBenchmark();
printf("\n");
} while(config.loop);
......
......@@ -909,7 +909,7 @@ int processCommand(redisClient *c) {
} else if (c->multibulk) {
if (c->bulklen == -1) {
if (((char*)c->argv[0]->ptr)[0] != '$') {
addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
addReplyError(c,"multi bulk protocol error");
resetClient(c);
return 1;
} else {
......@@ -922,7 +922,7 @@ int processCommand(redisClient *c) {
bulklen < 0 || bulklen > 1024*1024*1024)
{
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
addReplyError(c,"invalid bulk write count");
resetClient(c);
return 1;
}
......@@ -975,17 +975,14 @@ int processCommand(redisClient *c) {
* such wrong arity, bad command name and so forth. */
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
addReplySds(c,
sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
(char*)c->argv[0]->ptr));
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
resetClient(c);
return 1;
} else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
(c->argc < -cmd->arity)) {
addReplySds(c,
sdscatprintf(sdsempty(),
"-ERR wrong number of arguments for '%s' command\r\n",
cmd->name));
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
cmd->name);
resetClient(c);
return 1;
} else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
......@@ -999,7 +996,7 @@ int processCommand(redisClient *c) {
bulklen < 0 || bulklen > 1024*1024*1024)
{
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
addReplyError(c,"invalid bulk write count");
resetClient(c);
return 1;
}
......@@ -1026,7 +1023,7 @@ int processCommand(redisClient *c) {
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
addReplyError(c,"operation not permitted");
resetClient(c);
return 1;
}
......@@ -1035,7 +1032,7 @@ int processCommand(redisClient *c) {
if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
zmalloc_used_memory() > server.maxmemory)
{
addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
addReplyError(c,"command not allowed when used memory > 'maxmemory'");
resetClient(c);
return 1;
}
......@@ -1045,7 +1042,7 @@ int processCommand(redisClient *c) {
&&
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
resetClient(c);
return 1;
}
......@@ -1109,7 +1106,7 @@ void authCommand(redisClient *c) {
addReply(c,shared.ok);
} else {
c->authenticated = 0;
addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
addReplyError(c,"invalid password");
}
}
......
......@@ -47,6 +47,7 @@
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
#define REDIS_SHARED_INTEGERS 10000
#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
#define REDIS_WRITEV_THRESHOLD 3
......@@ -309,6 +310,11 @@ typedef struct redisClient {
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
/* Response buffer */
int bufpos;
int buflen;
char buf[];
} redisClient;
struct saveparam {
......@@ -588,6 +594,8 @@ void resetClient(redisClient *c);
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
void *addDeferredMultiBulkLength(redisClient *c);
void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
void addReplySds(redisClient *c, sds s);
void processInputBuffer(redisClient *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
......@@ -597,11 +605,23 @@ void addReplyBulkCString(redisClient *c, char *s);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
void addReplySds(redisClient *c, sds s);
void addReplyError(redisClient *c, char *err);
void addReplyStatus(redisClient *c, char *status);
void addReplyDouble(redisClient *c, double d);
void addReplyLongLong(redisClient *c, long long ll);
void addReplyUlong(redisClient *c, unsigned long ul);
void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
#else
void addReplyErrorFormat(redisClient *c, const char *fmt, ...);
void addReplyStatusFormat(redisClient *c, const char *fmt, ...);
#endif
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
void listTypePush(robj *subject, robj *value, int where);
......
......@@ -179,7 +179,7 @@ void syncCommand(redisClient *c) {
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n"));
addReplyError(c,"Can't SYNC while not connected with my master");
return;
}
......@@ -188,7 +188,7 @@ void syncCommand(redisClient *c) {
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (listLength(c->reply) != 0) {
addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
addReplyError(c,"SYNC is invalid with pending input");
return;
}
......@@ -226,7 +226,7 @@ void syncCommand(redisClient *c) {
redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
addReplyError(c,"Unable to perform background save");
return;
}
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
......
......@@ -33,7 +33,6 @@
#include "sds.h"
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <ctype.h>
#include "zmalloc.h"
......@@ -156,8 +155,8 @@ sds sdscpy(sds s, char *t) {
return sdscpylen(s, t, strlen(t));
}
sds sdscatprintf(sds s, const char *fmt, ...) {
va_list ap;
sds sdscatvprintf(sds s, const char *fmt, va_list ap) {
va_list cpy;
char *buf, *t;
size_t buflen = 16;
......@@ -169,9 +168,8 @@ sds sdscatprintf(sds s, const char *fmt, ...) {
if (buf == NULL) return NULL;
#endif
buf[buflen-2] = '\0';
va_start(ap, fmt);
vsnprintf(buf, buflen, fmt, ap);
va_end(ap);
va_copy(cpy,ap);
vsnprintf(buf, buflen, fmt, cpy);
if (buf[buflen-2] != '\0') {
zfree(buf);
buflen *= 2;
......@@ -184,6 +182,15 @@ sds sdscatprintf(sds s, const char *fmt, ...) {
return t;
}
sds sdscatprintf(sds s, const char *fmt, ...) {
va_list ap;
char *t;
va_start(ap, fmt);
t = sdscatvprintf(s,fmt,ap);
va_end(ap);
return t;
}
sds sdstrim(sds s, const char *cset) {
struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
char *start, *end, *sp, *ep;
......
......@@ -32,6 +32,7 @@
#define __SDS_H
#include <sys/types.h>
#include <stdarg.h>
typedef char *sds;
......@@ -53,6 +54,7 @@ sds sdscat(sds s, char *t);
sds sdscpylen(sds s, char *t, size_t len);
sds sdscpy(sds s, char *t);
sds sdscatvprintf(sds s, const char *fmt, va_list ap);
#ifdef __GNUC__
sds sdscatprintf(sds s, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
......
......@@ -307,7 +307,7 @@ void sortCommand(redisClient *c) {
outputlen = getop ? getop*(end-start+1) : end-start+1;
if (storekey == NULL) {
/* STORE option not specified, sent the sorting result to client */
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
addReplyMultiBulkLen(c,outputlen);
for (j = start; j <= end; j++) {
listNode *ln;
listIter li;
......@@ -369,7 +369,7 @@ void sortCommand(redisClient *c) {
* replaced. */
server.dirty += 1+outputlen;
touchWatchedKey(c->db,storekey);
addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
addReplyLongLong(c,outputlen);
}
/* Cleanup */
......
......@@ -249,7 +249,7 @@ void hmsetCommand(redisClient *c) {
robj *o;
if ((c->argc % 2) == 1) {
addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n"));
addReplyError(c,"wrong number of arguments for HMSET");
return;
}
......@@ -315,7 +315,7 @@ void hmgetCommand(redisClient *c) {
/* Note the check for o != NULL happens inside the loop. This is
* done because objects that cannot be found are considered to be
* an empty hash. The reply should then be a series of NULLs. */
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-2));
addReplyMultiBulkLen(c,c->argc-2);
for (i = 2; i < c->argc; i++) {
if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) {
addReplyBulk(c,value);
......@@ -346,21 +346,19 @@ void hlenCommand(redisClient *c) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,REDIS_HASH)) return;
addReplyUlong(c,hashTypeLength(o));
addReplyLongLong(c,hashTypeLength(o));
}
void genericHgetallCommand(redisClient *c, int flags) {
robj *o, *lenobj, *obj;
robj *o, *obj;
unsigned long count = 0;
hashTypeIterator *hi;
void *replylen = NULL;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,REDIS_HASH)) return;
lenobj = createObject(REDIS_STRING,NULL);
addReply(c,lenobj);
decrRefCount(lenobj);
replylen = addDeferredMultiBulkLength(c);
hi = hashTypeInitIterator(o);
while (hashTypeNext(hi) != REDIS_ERR) {
if (flags & REDIS_HASH_KEY) {
......@@ -377,8 +375,7 @@ void genericHgetallCommand(redisClient *c, int flags) {
}
}
hashTypeReleaseIterator(hi);
lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count);
setDeferredMultiBulkLength(c,replylen,count);
}
void hkeysCommand(redisClient *c) {
......
......@@ -342,7 +342,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
server.dirty++;
}
addReplyUlong(c,listTypeLength(subject));
addReplyLongLong(c,listTypeLength(subject));
}
void lpushxCommand(redisClient *c) {
......@@ -366,7 +366,7 @@ void linsertCommand(redisClient *c) {
void llenCommand(redisClient *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
addReplyUlong(c,listTypeLength(o));
addReplyLongLong(c,listTypeLength(o));
}
void lindexCommand(redisClient *c) {
......@@ -494,7 +494,7 @@ void lrangeCommand(redisClient *c) {
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
addReplyMultiBulkLen(c,rangelen);
listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
for (j = 0; j < rangelen; j++) {
redisAssert(listTypeNext(li,&entry));
......@@ -594,7 +594,7 @@ void lremCommand(redisClient *c) {
decrRefCount(obj);
if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
addReplyLongLong(c,removed);
if (removed) touchWatchedKey(c->db,c->argv[1]);
}
......@@ -772,7 +772,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
redisAssert(ln != NULL);
receiver = ln->value;
addReplySds(receiver,sdsnew("*2\r\n"));
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
unblockClientWaitingData(receiver);
......@@ -792,7 +792,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
/* Make sure the timeout is not negative */
if (lltimeout < 0) {
addReplySds(c,sdsnew("-ERR timeout is negative\r\n"));
addReplyError(c,"timeout is negative");
return;
}
......@@ -822,7 +822,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
* "real" command will add the last element (the value)
* for us. If this souds like an hack to you it's just
* because it is... */
addReplySds(c,sdsnew("*2\r\n"));
addReplyMultiBulkLen(c,2);
addReplyBulk(c,argv[1]);
popGenericCommand(c,where);
......
......@@ -276,7 +276,7 @@ void scardCommand(redisClient *c) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,REDIS_SET)) return;
addReplyUlong(c,setTypeSize(o));
addReplyLongLong(c,setTypeSize(o));
}
void spopCommand(redisClient *c) {
......@@ -320,7 +320,8 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *ele, *lenobj = NULL, *dstset = NULL;
robj *ele, *dstset = NULL;
void *replylen = NULL;
unsigned long j, cardinality = 0;
for (j = 0; j < setnum; j++) {
......@@ -356,9 +357,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
* to the output list and save the pointer to later modify it with the
* right length */
if (!dstkey) {
lenobj = createObject(REDIS_STRING,NULL);
addReply(c,lenobj);
decrRefCount(lenobj);
replylen = addDeferredMultiBulkLength(c);
} else {
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
......@@ -400,7 +399,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
touchWatchedKey(c->db,dstkey);
server.dirty++;
} else {
lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
setDeferredMultiBulkLength(c,replylen,cardinality);
}
zfree(sets);
}
......@@ -470,7 +469,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
/* Output the content of the resulting set, if not in STORE mode */
if (!dstkey) {
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset);
while((ele = setTypeNext(si)) != NULL) {
addReplyBulk(c,ele);
......
......@@ -12,7 +12,7 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK)
return;
if (seconds <= 0) {
addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n"));
addReplyError(c,"invalid expire time in SETEX");
return;
}
}
......@@ -79,7 +79,7 @@ void getsetCommand(redisClient *c) {
void mgetCommand(redisClient *c) {
int j;
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
addReplyMultiBulkLen(c,c->argc-1);
for (j = 1; j < c->argc; j++) {
robj *o = lookupKeyRead(c->db,c->argv[j]);
if (o == NULL) {
......@@ -98,7 +98,7 @@ void msetGenericCommand(redisClient *c, int nx) {
int j, busykeys = 0;
if ((c->argc % 2) == 0) {
addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
addReplyError(c,"wrong number of arguments for MSET");
return;
}
/* Handle the NX flag. The MSETNX semantic is to return zero and don't
......@@ -211,7 +211,7 @@ void appendCommand(redisClient *c) {
}
touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",(unsigned long)totlen));
addReplyLongLong(c,totlen);
}
void substrCommand(redisClient *c) {
......
......@@ -355,8 +355,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, i
*score = scoreval;
}
if (isnan(*score)) {
addReplySds(c,
sdsnew("-ERR resulting score is not a number (NaN)\r\n"));
addReplyError(c,"resulting score is not a number (NaN)");
zfree(score);
/* Note that we don't need to check if the zset may be empty and
* should be removed here, as we can only obtain Nan as score if
......@@ -561,7 +560,8 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
/* expect setnum input keys to be given */
setnum = atoi(c->argv[2]->ptr);
if (setnum < 1) {
addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n"));
addReplyError(c,
"at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return;
}
......@@ -782,8 +782,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) {
}
/* Return the result in form of a multi-bulk reply */
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
withscores ? (rangelen*2) : rangelen));
addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
for (j = 0; j < rangelen; j++) {
ele = ln->obj;
addReplyBulk(c,ele);
......@@ -840,8 +839,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
badsyntax = 1;
if (badsyntax) {
addReplySds(c,
sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE");
return;
}
......@@ -866,7 +864,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
zset *zsetobj = o->ptr;
zskiplist *zsl = zsetobj->zsl;
zskiplistNode *ln;
robj *ele, *lenobj = NULL;
robj *ele;
void *replylen = NULL;
unsigned long rangelen = 0;
/* Get the first node with the score >= min, or with
......@@ -884,11 +883,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
* are in the list, so we push this object that will represent
* the multi-bulk length in the output buffer, and will "fix"
* it later */
if (!justcount) {
lenobj = createObject(REDIS_STRING,NULL);
addReply(c,lenobj);
decrRefCount(lenobj);
}
if (!justcount)
replylen = addDeferredMultiBulkLength(c);
while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
if (offset) {
......@@ -910,7 +906,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
if (justcount) {
addReplyLongLong(c,(long)rangelen);
} else {
lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
setDeferredMultiBulkLength(c,replylen,
withscores ? (rangelen*2) : rangelen);
}
}
......@@ -933,7 +929,7 @@ void zcardCommand(redisClient *c) {
checkType(c,o,REDIS_ZSET)) return;
zs = o->ptr;
addReplyUlong(c,zs->zsl->length);
addReplyLongLong(c,zs->zsl->length);
}
void zscoreCommand(redisClient *c) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册