diff --git a/src/aof.c b/src/aof.c index cfbc941d5fe292271e211b27ff519804efa5ffe6..72b476e7b1bf0ec7f1d1a54b8d39071ed35e1cbd 100644 --- a/src/aof.c +++ b/src/aof.c @@ -296,6 +296,7 @@ struct redisClient *createFakeClient(void) { c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->reply = listCreate(); c->reply_bytes = 0; + c->obuf_soft_limit_reached_time = 0; c->watched_keys = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); diff --git a/src/networking.c b/src/networking.c index 47fbcc58b6b452ff711bc772604c9614c4d90599..6c3e7d8148568b9ef379f78286085597c0a0f5db 100644 --- a/src/networking.c +++ b/src/networking.c @@ -48,6 +48,7 @@ redisClient *createClient(int fd) { c->replstate = REDIS_REPL_NONE; c->reply = listCreate(); c->reply_bytes = 0; + c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); c->bpop.keys = NULL; @@ -139,6 +140,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) { } } c->reply_bytes += sdslen(o->ptr); + asyncCloseClientOnOutputBufferLimitReached(c); } /* This method takes responsibility over the sds. When it is no longer @@ -168,6 +170,7 @@ void _addReplySdsToList(redisClient *c, sds s) { listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); } } + asyncCloseClientOnOutputBufferLimitReached(c); } void _addReplyStringToList(redisClient *c, char *s, size_t len) { @@ -191,6 +194,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) { } } c->reply_bytes += len; + asyncCloseClientOnOutputBufferLimitReached(c); } /* ----------------------------------------------------------------------------- @@ -318,6 +322,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { listDelNode(c->reply,ln->next); } } + asyncCloseClientOnOutputBufferLimitReached(c); } /* Add a duble as a bulk reply */ @@ -558,12 +563,42 @@ void freeClient(redisClient *c) { } } } + + /* If this client was scheduled for async freeing we need to remove it + * from the queue. */ + if (c->flags & REDIS_CLOSE_ASAP) { + ln = listSearchKey(server.clients_to_close,c); + redisAssert(ln != NULL); + listDelNode(server.clients_to_close,ln); + } + /* Release memory */ zfree(c->argv); freeClientMultiState(c); zfree(c); } +/* Schedule a client to free it at a safe time in the serverCron() function. + * This function is useful when we need to terminate a client but we are in + * a context where calling freeClient() is not possible, because the client + * should be valid for the continuation of the flow of the program. */ +void freeClientAsync(redisClient *c) { + if (c->flags & REDIS_CLOSE_ASAP) return; + c->flags |= REDIS_CLOSE_ASAP; + listAddNodeTail(server.clients_to_close,c); +} + +void freeClientsInAsyncFreeQueue(void) { + while (listLength(server.clients_to_close)) { + listNode *ln = listFirst(server.clients_to_close); + redisClient *c = listNodeValue(ln); + + c->flags &= ~REDIS_CLOSE_ASAP; + freeClient(c); + listDelNode(server.clients_to_close,ln); + } +} + void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; @@ -1006,6 +1041,7 @@ sds getClientInfoString(redisClient *client) { if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd'; if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & REDIS_UNBLOCKED) *p++ = 'u'; + if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A'; if (p == flags) *p++ = 'N'; *p++ = '\0'; @@ -1164,3 +1200,63 @@ int getClientLimitClass(redisClient *c) { return REDIS_CLIENT_LIMIT_CLASS_PUBSUB; return REDIS_CLIENT_LIMIT_CLASS_NORMAL; } + +/* The function checks if the client reached output buffer soft or hard + * limit, and also update the state needed to check the soft limit as + * a side effect. + * + * Return value: non-zero if the client reached the soft or the hard limit. + * Otherwise zero is returned. */ +int checkClientOutputBufferLimits(redisClient *c) { + int soft = 0, hard = 0, class; + unsigned long used_mem = getClientOutputBufferMemoryUsage(c); + + class = getClientLimitClass(c); + if (server.client_obuf_limits[class].hard_limit_bytes && + used_mem >= server.client_obuf_limits[class].hard_limit_bytes) + hard = 1; + if (server.client_obuf_limits[class].soft_limit_bytes && + used_mem >= server.client_obuf_limits[class].soft_limit_bytes) + soft = 1; + + /* We need to check if the soft limit is reached continuously for the + * specified amount of seconds. */ + if (soft) { + if (c->obuf_soft_limit_reached_time == 0) { + c->obuf_soft_limit_reached_time = server.unixtime; + soft = 0; /* First time we see the soft limit reached */ + } else { + time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; + + if (elapsed <= + server.client_obuf_limits[class].soft_limit_seconds) { + soft = 0; /* The client still did not reached the max number of + seconds for the soft limit to be considered + reached. */ + } + } + } else { + c->obuf_soft_limit_reached_time = 0; + } + return soft || hard; +} + +/* Asynchronously close a client if soft or hard limit is reached on the + * output buffer size. If the client will be closed 1 is returend, otherwise 0 + * is returned. + * + * Note: we need to close the client asynchronously because this function is + * called from contexts where the client can't be freed safely, i.e. from the + * lower level functions pushing data inside the client output buffers. */ +int asyncCloseClientOnOutputBufferLimitReached(redisClient *c) { + if (checkClientOutputBufferLimits(c)) { + sds client = getClientInfoString(c); + + freeClientAsync(c); + redisLog(REDIS_NOTICE,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits."); + sdsfree(client); + return 1; + } else { + return 0; + } +} diff --git a/src/redis.c b/src/redis.c index 17d7517f982cd0222d83ab6419dd72687ad4d82d..711cebe03b95005d1eb94cc4d60eb31925ad9527 100644 --- a/src/redis.c +++ b/src/redis.c @@ -926,6 +926,17 @@ void initServerConfig() { server.repl_serve_stale_data = 1; server.repl_down_since = -1; + /* Client output buffer limits */ + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 0; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*256; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*256; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*32; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60; + /* Double constants initialization */ R_Zero = 0.0; R_PosInf = 1.0/R_Zero; @@ -1002,6 +1013,7 @@ void initServer() { server.current_client = NULL; server.clients = listCreate(); + server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.unblocked_clients = listCreate(); diff --git a/src/redis.h b/src/redis.h index 8b262171c4e970df9b22292b4a85dee19b4fbd8e..dbb56ca3e24c920208ce1f79a913bfae5334bf03 100644 --- a/src/redis.h +++ b/src/redis.h @@ -142,6 +142,7 @@ server.unblocked_clients */ #define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */ #define REDIS_ASKING 1024 /* Client issued the ASKING command */ +#define REDIS_CLOSE_ASAP 2048 /* Close this client ASAP */ /* Client request types */ #define REDIS_REQ_INLINE 1 @@ -152,6 +153,7 @@ #define REDIS_CLIENT_LIMIT_CLASS_NORMAL 0 #define REDIS_CLIENT_LIMIT_CLASS_SLAVE 1 #define REDIS_CLIENT_LIMIT_CLASS_PUBSUB 2 +#define REDIS_CLIENT_LIMIT_NUM_CLASSES 3 /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -315,6 +317,7 @@ typedef struct redisClient { unsigned long reply_bytes; /* Tot bytes of objects in reply list */ int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ + time_t obuf_soft_limit_reached_time; int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ int slaveseldb; /* slave selected db, if this client is a slave */ int authenticated; /* when requirepass is non-NULL */ @@ -374,6 +377,12 @@ typedef struct zset { zskiplist *zsl; } zset; +typedef struct clientBufferLimitsConfig { + unsigned long hard_limit_bytes; + unsigned long soft_limit_bytes; + time_t soft_limit_seconds; +} clientBufferLimitsConfig; + /*----------------------------------------------------------------------------- * Redis cluster data structures *----------------------------------------------------------------------------*/ @@ -526,6 +535,7 @@ struct redisServer { int sofd; /* Unix socket file descriptor */ int cfd; /* Cluster bus lisetning socket */ list *clients; /* List of active clients */ + list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ redisClient *current_client; /* Current client, only used on crash report */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ @@ -559,6 +569,7 @@ struct redisServer { size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int daemonize; /* True if running as a daemon */ + clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES]; /* AOF persistence */ int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ @@ -792,6 +803,8 @@ sds getAllClientsInfoString(void); void rewriteClientCommandVector(redisClient *c, int argc, ...); void rewriteClientCommandArgument(redisClient *c, int i, robj *newval); unsigned long getClientOutputBufferMemoryUsage(redisClient *c); +void freeClientsInAsyncFreeQueue(void); +int asyncCloseClientOnOutputBufferLimitReached(redisClient *c); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...)