提交 7eac2a75 编写于 作者: A antirez

Implementation of the internals that make possible to terminate clients...

Implementation of the internals that make possible to terminate clients overcoming configured output buffer (soft and hard) limits.
上级 890da62e
...@@ -296,6 +296,7 @@ struct redisClient *createFakeClient(void) { ...@@ -296,6 +296,7 @@ struct redisClient *createFakeClient(void) {
c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
c->reply = listCreate(); c->reply = listCreate();
c->reply_bytes = 0; c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
c->watched_keys = listCreate(); c->watched_keys = listCreate();
listSetFreeMethod(c->reply,decrRefCount); listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
......
...@@ -48,6 +48,7 @@ redisClient *createClient(int fd) { ...@@ -48,6 +48,7 @@ redisClient *createClient(int fd) {
c->replstate = REDIS_REPL_NONE; c->replstate = REDIS_REPL_NONE;
c->reply = listCreate(); c->reply = listCreate();
c->reply_bytes = 0; c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,decrRefCount); listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
c->bpop.keys = NULL; c->bpop.keys = NULL;
...@@ -139,6 +140,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) { ...@@ -139,6 +140,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
} }
} }
c->reply_bytes += sdslen(o->ptr); c->reply_bytes += sdslen(o->ptr);
asyncCloseClientOnOutputBufferLimitReached(c);
} }
/* This method takes responsibility over the sds. When it is no longer /* This method takes responsibility over the sds. When it is no longer
...@@ -168,6 +170,7 @@ void _addReplySdsToList(redisClient *c, sds s) { ...@@ -168,6 +170,7 @@ void _addReplySdsToList(redisClient *c, sds s) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
} }
} }
asyncCloseClientOnOutputBufferLimitReached(c);
} }
void _addReplyStringToList(redisClient *c, char *s, size_t len) { void _addReplyStringToList(redisClient *c, char *s, size_t len) {
...@@ -191,6 +194,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) { ...@@ -191,6 +194,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
} }
} }
c->reply_bytes += len; c->reply_bytes += len;
asyncCloseClientOnOutputBufferLimitReached(c);
} }
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
...@@ -318,6 +322,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { ...@@ -318,6 +322,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
listDelNode(c->reply,ln->next); listDelNode(c->reply,ln->next);
} }
} }
asyncCloseClientOnOutputBufferLimitReached(c);
} }
/* Add a duble as a bulk reply */ /* Add a duble as a bulk reply */
...@@ -558,12 +563,42 @@ void freeClient(redisClient *c) { ...@@ -558,12 +563,42 @@ void freeClient(redisClient *c) {
} }
} }
} }
/* If this client was scheduled for async freeing we need to remove it
* from the queue. */
if (c->flags & REDIS_CLOSE_ASAP) {
ln = listSearchKey(server.clients_to_close,c);
redisAssert(ln != NULL);
listDelNode(server.clients_to_close,ln);
}
/* Release memory */ /* Release memory */
zfree(c->argv); zfree(c->argv);
freeClientMultiState(c); freeClientMultiState(c);
zfree(c); zfree(c);
} }
/* Schedule a client to free it at a safe time in the serverCron() function.
* This function is useful when we need to terminate a client but we are in
* a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */
void freeClientAsync(redisClient *c) {
if (c->flags & REDIS_CLOSE_ASAP) return;
c->flags |= REDIS_CLOSE_ASAP;
listAddNodeTail(server.clients_to_close,c);
}
void freeClientsInAsyncFreeQueue(void) {
while (listLength(server.clients_to_close)) {
listNode *ln = listFirst(server.clients_to_close);
redisClient *c = listNodeValue(ln);
c->flags &= ~REDIS_CLOSE_ASAP;
freeClient(c);
listDelNode(server.clients_to_close,ln);
}
}
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata; redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen; int nwritten = 0, totwritten = 0, objlen;
...@@ -1006,6 +1041,7 @@ sds getClientInfoString(redisClient *client) { ...@@ -1006,6 +1041,7 @@ sds getClientInfoString(redisClient *client) {
if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd'; if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & REDIS_UNBLOCKED) *p++ = 'u'; if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A';
if (p == flags) *p++ = 'N'; if (p == flags) *p++ = 'N';
*p++ = '\0'; *p++ = '\0';
...@@ -1164,3 +1200,63 @@ int getClientLimitClass(redisClient *c) { ...@@ -1164,3 +1200,63 @@ int getClientLimitClass(redisClient *c) {
return REDIS_CLIENT_LIMIT_CLASS_PUBSUB; return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
return REDIS_CLIENT_LIMIT_CLASS_NORMAL; return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
} }
/* The function checks if the client reached output buffer soft or hard
* limit, and also update the state needed to check the soft limit as
* a side effect.
*
* Return value: non-zero if the client reached the soft or the hard limit.
* Otherwise zero is returned. */
int checkClientOutputBufferLimits(redisClient *c) {
int soft = 0, hard = 0, class;
unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
class = getClientLimitClass(c);
if (server.client_obuf_limits[class].hard_limit_bytes &&
used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
hard = 1;
if (server.client_obuf_limits[class].soft_limit_bytes &&
used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
soft = 1;
/* We need to check if the soft limit is reached continuously for the
* specified amount of seconds. */
if (soft) {
if (c->obuf_soft_limit_reached_time == 0) {
c->obuf_soft_limit_reached_time = server.unixtime;
soft = 0; /* First time we see the soft limit reached */
} else {
time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
if (elapsed <=
server.client_obuf_limits[class].soft_limit_seconds) {
soft = 0; /* The client still did not reached the max number of
seconds for the soft limit to be considered
reached. */
}
}
} else {
c->obuf_soft_limit_reached_time = 0;
}
return soft || hard;
}
/* Asynchronously close a client if soft or hard limit is reached on the
* output buffer size. If the client will be closed 1 is returend, otherwise 0
* is returned.
*
* Note: we need to close the client asynchronously because this function is
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
int asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
if (checkClientOutputBufferLimits(c)) {
sds client = getClientInfoString(c);
freeClientAsync(c);
redisLog(REDIS_NOTICE,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.");
sdsfree(client);
return 1;
} else {
return 0;
}
}
...@@ -926,6 +926,17 @@ void initServerConfig() { ...@@ -926,6 +926,17 @@ void initServerConfig() {
server.repl_serve_stale_data = 1; server.repl_serve_stale_data = 1;
server.repl_down_since = -1; server.repl_down_since = -1;
/* Client output buffer limits */
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*256;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*256;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*32;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60;
/* Double constants initialization */ /* Double constants initialization */
R_Zero = 0.0; R_Zero = 0.0;
R_PosInf = 1.0/R_Zero; R_PosInf = 1.0/R_Zero;
...@@ -1002,6 +1013,7 @@ void initServer() { ...@@ -1002,6 +1013,7 @@ void initServer() {
server.current_client = NULL; server.current_client = NULL;
server.clients = listCreate(); server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate(); server.slaves = listCreate();
server.monitors = listCreate(); server.monitors = listCreate();
server.unblocked_clients = listCreate(); server.unblocked_clients = listCreate();
......
...@@ -142,6 +142,7 @@ ...@@ -142,6 +142,7 @@
server.unblocked_clients */ server.unblocked_clients */
#define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */ #define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */
#define REDIS_ASKING 1024 /* Client issued the ASKING command */ #define REDIS_ASKING 1024 /* Client issued the ASKING command */
#define REDIS_CLOSE_ASAP 2048 /* Close this client ASAP */
/* Client request types */ /* Client request types */
#define REDIS_REQ_INLINE 1 #define REDIS_REQ_INLINE 1
...@@ -152,6 +153,7 @@ ...@@ -152,6 +153,7 @@
#define REDIS_CLIENT_LIMIT_CLASS_NORMAL 0 #define REDIS_CLIENT_LIMIT_CLASS_NORMAL 0
#define REDIS_CLIENT_LIMIT_CLASS_SLAVE 1 #define REDIS_CLIENT_LIMIT_CLASS_SLAVE 1
#define REDIS_CLIENT_LIMIT_CLASS_PUBSUB 2 #define REDIS_CLIENT_LIMIT_CLASS_PUBSUB 2
#define REDIS_CLIENT_LIMIT_NUM_CLASSES 3
/* Slave replication state - slave side */ /* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */ #define REDIS_REPL_NONE 0 /* No active replication */
...@@ -315,6 +317,7 @@ typedef struct redisClient { ...@@ -315,6 +317,7 @@ typedef struct redisClient {
unsigned long reply_bytes; /* Tot bytes of objects in reply list */ unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen; int sentlen;
time_t lastinteraction; /* time of the last interaction, used for timeout */ time_t lastinteraction; /* time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int slaveseldb; /* slave selected db, if this client is a slave */ int slaveseldb; /* slave selected db, if this client is a slave */
int authenticated; /* when requirepass is non-NULL */ int authenticated; /* when requirepass is non-NULL */
...@@ -374,6 +377,12 @@ typedef struct zset { ...@@ -374,6 +377,12 @@ typedef struct zset {
zskiplist *zsl; zskiplist *zsl;
} zset; } zset;
typedef struct clientBufferLimitsConfig {
unsigned long hard_limit_bytes;
unsigned long soft_limit_bytes;
time_t soft_limit_seconds;
} clientBufferLimitsConfig;
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Redis cluster data structures * Redis cluster data structures
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
...@@ -526,6 +535,7 @@ struct redisServer { ...@@ -526,6 +535,7 @@ struct redisServer {
int sofd; /* Unix socket file descriptor */ int sofd; /* Unix socket file descriptor */
int cfd; /* Cluster bus lisetning socket */ int cfd; /* Cluster bus lisetning socket */
list *clients; /* List of active clients */ list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */ list *slaves, *monitors; /* List of slaves and MONITORs */
redisClient *current_client; /* Current client, only used on crash report */ redisClient *current_client; /* Current client, only used on crash report */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
...@@ -559,6 +569,7 @@ struct redisServer { ...@@ -559,6 +569,7 @@ struct redisServer {
size_t client_max_querybuf_len; /* Limit for client query buffer length */ size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */ int dbnum; /* Total number of configured DBs */
int daemonize; /* True if running as a daemon */ int daemonize; /* True if running as a daemon */
clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
/* AOF persistence */ /* AOF persistence */
int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */ int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */ int aof_fsync; /* Kind of fsync() policy */
...@@ -792,6 +803,8 @@ sds getAllClientsInfoString(void); ...@@ -792,6 +803,8 @@ sds getAllClientsInfoString(void);
void rewriteClientCommandVector(redisClient *c, int argc, ...); void rewriteClientCommandVector(redisClient *c, int argc, ...);
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval); void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
unsigned long getClientOutputBufferMemoryUsage(redisClient *c); unsigned long getClientOutputBufferMemoryUsage(redisClient *c);
void freeClientsInAsyncFreeQueue(void);
int asyncCloseClientOnOutputBufferLimitReached(redisClient *c);
#ifdef __GNUC__ #ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...) void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册