#include "redis.h" #include void *dupClientReplyValue(void *o) { incrRefCount((robj*)o); return o; } int listMatchObjects(void *a, void *b) { return equalStringObjects(a,b); } redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(redisClient)); c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (!c) return NULL; if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); c->newline = NULL; c->argc = 0; c->argv = NULL; c->bulklen = -1; c->multibulk = 0; c->mbargc = 0; c->mbargv = NULL; c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); c->blocking_keys = NULL; c->blocking_keys_num = 0; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); listSetFreeMethod(c->pubsub_patterns,decrRefCount); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); listAddNodeTail(server.clients,c); initClientMultiState(c); return c; } int _installWriteEvent(redisClient *c) { if (c->fd <= 0) return REDIS_ERR; if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return REDIS_ERR; return REDIS_OK; } /* Create a duplicate of the last object in the reply list when * it is not exclusively owned by the reply list. */ robj *dupLastObjectIfNeeded(list *reply) { robj *new, *cur; listNode *ln; redisAssert(listLength(reply) > 0); ln = listLast(reply); cur = listNodeValue(ln); if (cur->refcount > 1) { new = dupStringObject(cur); decrRefCount(cur); listNodeValue(ln) = new; } return listNodeValue(ln); } int _addReplyToBuffer(redisClient *c, char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return REDIS_ERR; /* Check that the buffer has enough space available for this string. */ if (len > available) return REDIS_ERR; memcpy(c->buf+c->bufpos,s,len); c->bufpos+=len; return REDIS_OK; } void _addReplyObjectToList(redisClient *c, robj *o) { robj *tail; if (listLength(c->reply) == 0) { incrRefCount(o); listAddNodeTail(c->reply,o); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) { tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); } else { incrRefCount(o); listAddNodeTail(c->reply,o); } } } /* This method takes responsibility over the sds. When it is no longer * needed it will be free'd, otherwise it ends up in a robj. */ void _addReplySdsToList(redisClient *c, sds s) { robj *tail; if (listLength(c->reply) == 0) { listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) { tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); sdsfree(s); } else { listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); } } } void _addReplyStringToList(redisClient *c, char *s, size_t len) { robj *tail; if (listLength(c->reply) == 0) { listAddNodeTail(c->reply,createStringObject(s,len)); } else { tail = listNodeValue(listLast(c->reply)); /* Append to this object when possible. */ if (tail->ptr != NULL && sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) { tail = dupLastObjectIfNeeded(c->reply); tail->ptr = sdscatlen(tail->ptr,s,len); } else { listAddNodeTail(c->reply,createStringObject(s,len)); } } } void addReply(redisClient *c, robj *obj) { if (_installWriteEvent(c) != REDIS_OK) return; redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); /* This is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the * refcount field of the object if it's not needed. * * If the encoding is RAW and there is room in the static buffer * we'll be able to send the object to the client without * messing with its page. */ if (obj->encoding == REDIS_ENCODING_RAW) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj); } else { obj = getDecodedObject(obj); if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj); decrRefCount(obj); } } void addReplySds(redisClient *c, sds s) { if (_installWriteEvent(c) != REDIS_OK) { /* The caller expects the sds to be free'd. */ sdsfree(s); return; } if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { sdsfree(s); } else { /* This method free's the sds when it is no longer needed. */ _addReplySdsToList(c,s); } } void addReplyString(redisClient *c, char *s, size_t len) { if (_installWriteEvent(c) != REDIS_OK) return; if (_addReplyToBuffer(c,s,len) != REDIS_OK) _addReplyStringToList(c,s,len); } void _addReplyError(redisClient *c, char *s, size_t len) { addReplyString(c,"-ERR ",5); addReplyString(c,s,len); addReplyString(c,"\r\n",2); } void addReplyError(redisClient *c, char *err) { _addReplyError(c,err,strlen(err)); } void addReplyErrorFormat(redisClient *c, const char *fmt, ...) { va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); _addReplyError(c,s,sdslen(s)); sdsfree(s); } void _addReplyStatus(redisClient *c, char *s, size_t len) { addReplyString(c,"+",1); addReplyString(c,s,len); addReplyString(c,"\r\n",2); } void addReplyStatus(redisClient *c, char *status) { _addReplyStatus(c,status,strlen(status)); } void addReplyStatusFormat(redisClient *c, const char *fmt, ...) { va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); _addReplyStatus(c,s,sdslen(s)); sdsfree(s); } /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addDeferredMultiBulkLength(redisClient *c) { /* Note that we install the write event here even if the object is not * ready to be sent, since we are sure that before returning to the * event loop setDeferredMultiBulkLength() will be called. */ if (_installWriteEvent(c) != REDIS_OK) return NULL; listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL)); return listLast(c->reply); } /* Populate the length object and try glueing it to the next chunk. */ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { listNode *ln = (listNode*)node; robj *len, *next; /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ if (node == NULL) return; len = listNodeValue(ln); len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); if (ln->next != NULL) { next = listNodeValue(ln->next); /* Only glue when the next node is non-NULL (an sds in this case) */ if (next->ptr != NULL) { len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); listDelNode(c->reply,ln->next); } } } void addReplyDouble(redisClient *c, double d) { char dbuf[128], sbuf[128]; int dlen, slen; dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); addReplyString(c,sbuf,slen); } void _addReplyLongLong(redisClient *c, long long ll, char prefix) { char buf[128]; int len; buf[0] = prefix; len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; addReplyString(c,buf,len+3); } void addReplyLongLong(redisClient *c, long long ll) { _addReplyLongLong(c,ll,':'); } void addReplyMultiBulkLen(redisClient *c, long length) { _addReplyLongLong(c,length,'*'); } void addReplyBulkLen(redisClient *c, robj *obj) { size_t len; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); } else { long n = (long)obj->ptr; /* Compute how many bytes will take this integer as a radix 10 string */ len = 1; if (n < 0) { len++; n = -n; } while((n = n/10) != 0) { len++; } } _addReplyLongLong(c,len,'$'); } void addReplyBulk(redisClient *c, robj *obj) { addReplyBulkLen(c,obj); addReply(c,obj); addReply(c,shared.crlf); } /* In the CONFIG command we need to add vanilla C string as bulk replies */ void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { addReply(c,shared.nullbulk); } else { robj *o = createStringObject(s,strlen(s)); addReplyBulk(c,o); decrRefCount(o); } } void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[128]; redisClient *c; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); cfd = anetAccept(server.neterr, fd, cip, &cport); if (cfd == AE_ERR) { redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); if ((c = createClient(cfd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); close(cfd); /* May be already closed, just ingore errors */ return; } /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before * for this condition, since now the socket is already set in nonblocking * mode and we can send an error for free using the Kernel I/O */ if (server.maxclients && listLength(server.clients) > server.maxclients) { char *err = "-ERR max number of clients reached\r\n"; /* That's a best effort error message, don't check write errors */ if (write(c->fd,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } freeClient(c); return; } server.stat_numconnections++; } static void freeClientArgv(redisClient *c) { int j; for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); for (j = 0; j < c->mbargc; j++) decrRefCount(c->mbargv[j]); c->argc = 0; c->mbargc = 0; } void freeClient(redisClient *c) { listNode *ln; /* Note that if the client we are freeing is blocked into a blocking * call, we have to set querybuf to NULL *before* to call * unblockClientWaitingData() to avoid processInputBuffer() will get * called. Also it is important to remove the file events after * this, because this call adds the READABLE event. */ sdsfree(c->querybuf); c->querybuf = NULL; if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); /* UNWATCH all the keys */ unwatchAllKeys(c); listRelease(c->watched_keys); /* Unsubscribe from all the pubsub channels */ pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); dictRelease(c->pubsub_channels); listRelease(c->pubsub_patterns); /* Obvious cleanup */ aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); listRelease(c->reply); freeClientArgv(c); close(c->fd); /* Remove from the list of clients */ ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); /* Remove from the list of clients waiting for swapped keys, or ready * to be restarted, but not yet woken up again. */ if (c->flags & REDIS_IO_WAIT) { redisAssert(server.vm_enabled); if (listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); /* When this client is waiting to be woken up (REDIS_IO_WAIT), * it should be present in the list io_ready_clients */ redisAssert(ln != NULL); listDelNode(server.io_ready_clients,ln); } else { while (listLength(c->io_keys)) { ln = listFirst(c->io_keys); dontWaitForSwappedKey(c,ln->value); } } server.vm_blocked_clients--; } listRelease(c->io_keys); /* Master/slave cleanup. * Case 1: we lost the connection with a slave. */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); redisAssert(ln != NULL); listDelNode(l,ln); } /* Case 2: we lost the connection with the master. */ if (c->flags & REDIS_MASTER) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; /* Since we lost the connection with the master, we should also * close the connection with all our slaves if we have any, so * when we'll resync with the master the other slaves will sync again * with us as well. Note that also when the slave is not connected * to the master it will keep refusing connections by other slaves. */ while (listLength(server.slaves)) { ln = listFirst(server.slaves); freeClient((redisClient*)ln->value); } } /* Release memory */ zfree(c->argv); zfree(c->mbargv); freeClientMultiState(c); zfree(c); } void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; robj *o; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); /* Use writev() if we have enough buffers to send */ if (!server.glueoutputbuf && listLength(c->reply) > REDIS_WRITEV_THRESHOLD && !(c->flags & REDIS_MASTER)) { sendReplyToClientWritev(el, fd, privdata, mask); return; } while(c->bufpos > 0 || listLength(c->reply)) { if (c->bufpos > 0) { if (c->flags & REDIS_MASTER) { /* Don't reply to a master */ nwritten = c->bufpos - c->sentlen; } else { nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); if (nwritten <= 0) break; } c->sentlen += nwritten; totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ if (c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); continue; } if (c->flags & REDIS_MASTER) { /* Don't reply to a master */ nwritten = objlen - c->sentlen; } else { nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); if (nwritten <= 0) break; } c->sentlen += nwritten; totwritten += nwritten; /* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; } } /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve * other clients as well, even if a very large request comes from * super fast link that is always able to accept data (in real world * scenario think about 'KEYS *' against the loopback interfae) */ if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break; } if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return; } } if (totwritten > 0) c->lastinteraction = time(NULL); if (listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); } } void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen, willwrite; robj *o; struct iovec iov[REDIS_WRITEV_IOVEC_COUNT]; int offset, ion = 0; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); listNode *node; while (listLength(c->reply)) { offset = c->sentlen; ion = 0; willwrite = 0; /* fill-in the iov[] array */ for(node = listFirst(c->reply); node; node = listNextNode(node)) { o = listNodeValue(node); objlen = sdslen(o->ptr); if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) break; if(ion == REDIS_WRITEV_IOVEC_COUNT) break; /* no more iovecs */ iov[ion].iov_base = ((char*)o->ptr) + offset; iov[ion].iov_len = objlen - offset; willwrite += objlen - offset; offset = 0; /* just for the first item */ ion++; } if(willwrite == 0) break; /* write all collected blocks at once */ if((nwritten = writev(fd, iov, ion)) < 0) { if (errno != EAGAIN) { redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c); return; } break; } totwritten += nwritten; offset = c->sentlen; /* remove written robjs from c->reply */ while (nwritten && listLength(c->reply)) { o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); if(nwritten >= objlen - offset) { listDelNode(c->reply, listFirst(c->reply)); nwritten -= objlen - offset; c->sentlen = 0; } else { /* partial write */ c->sentlen += nwritten; break; } offset = 0; } } if (totwritten > 0) c->lastinteraction = time(NULL); if (listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); } } /* resetClient prepare the client to process the next command */ void resetClient(redisClient *c) { freeClientArgv(c); c->bulklen = -1; c->multibulk = 0; c->newline = NULL; } void closeTimedoutClients(void) { redisClient *c; listNode *ln; time_t now = time(NULL); listIter li; listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { c = listNodeValue(ln); if (server.maxidletime && !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ listLength(c->pubsub_patterns) == 0 && (now - c->lastinteraction > server.maxidletime)) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { if (c->blockingto != 0 && c->blockingto < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } } } } void processInputBuffer(redisClient *c) { again: /* Before to process the input buffer, make sure the client is not * waitig for a blocking operation such as BLPOP. Note that the first * iteration the client is never blocked, otherwise the processInputBuffer * would not be called at all, but after the execution of the first commands * in the input buffer the client may be blocked, and the "goto again" * will try to reiterate. The following line will make it return asap. */ if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; if (c->bulklen == -1) { /* Read the first line of the query */ size_t querylen; if (c->newline) { char *p = c->newline; sds query, *argv; int argc, j; c->newline = NULL; query = c->querybuf; c->querybuf = sdsempty(); querylen = 1+(p-(query)); if (sdslen(query) > querylen) { /* leave data after the first line of the query in the buffer */ c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen); } *p = '\0'; /* remove "\n" */ if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */ sdsupdatelen(query); /* Now we can split the query in arguments */ argv = sdssplitlen(query,sdslen(query)," ",1,&argc); sdsfree(query); if (c->argv) zfree(c->argv); c->argv = zmalloc(sizeof(robj*)*argc); for (j = 0; j < argc; j++) { if (sdslen(argv[j])) { c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); c->argc++; } else { sdsfree(argv[j]); } } zfree(argv); if (c->argc) { /* Execute the command. If the client is still valid * after processCommand() return and there is something * on the query buffer try to process the next command. */ if (processCommand(c) && sdslen(c->querybuf)) goto again; } else { /* Nothing to process, argc == 0. Just process the query * buffer if it's not empty or return to the caller */ if (sdslen(c->querybuf)) goto again; } return; } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) { redisLog(REDIS_VERBOSE, "Client protocol error"); freeClient(c); return; } } else { /* Bulk read handling. Note that if we are at this point the client already sent a command terminated with a newline, we are reading the bulk data that is actually the last argument of the command. */ int qbl = sdslen(c->querybuf); if (c->bulklen <= qbl) { /* Copy everything but the final CRLF as final argument */ c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2); c->argc++; c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); /* Process the command. If the client is still valid after * the processing and there is more data in the buffer * try to parse it. */ if (processCommand(c) && sdslen(c->querybuf)) goto again; return; } } } void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; char buf[REDIS_IOBUF_LEN]; int nread; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); nread = read(fd, buf, REDIS_IOBUF_LEN); if (nread == -1) { if (errno == EAGAIN) { nread = 0; } else { redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno)); freeClient(c); return; } } else if (nread == 0) { redisLog(REDIS_VERBOSE, "Client closed connection"); freeClient(c); return; } if (nread) { size_t oldlen = sdslen(c->querybuf); c->querybuf = sdscatlen(c->querybuf, buf, nread); c->lastinteraction = time(NULL); /* Scan this new piece of the query for the newline. We do this * here in order to make sure we perform this scan just one time * per piece of buffer, leading to an O(N) scan instead of O(N*N) */ if (c->bulklen == -1 && c->newline == NULL) c->newline = strchr(c->querybuf+oldlen,'\n'); } else { return; } processInputBuffer(c); }