提交 294bcfc4 编写于 作者: A antirez

PubSub clients refactoring and new PUBSUB flag.

The code tested many times if a client had active Pub/Sub subscriptions
by checking the length of a list and dictionary where the patterns and
channels are stored. This was substituted with a client flag called
REDIS_PUBSUB that is simpler to test for. Moreover in order to manage
this flag some code was refactored.

This commit is believed to have no effects in the behavior of the
server.
上级 0c29cadd
...@@ -1527,7 +1527,7 @@ unsigned long getClientOutputBufferMemoryUsage(redisClient *c) { ...@@ -1527,7 +1527,7 @@ unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
int getClientType(redisClient *c) { int getClientType(redisClient *c) {
if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR)) if ((c->flags & REDIS_SLAVE) && !(c->flags & REDIS_MONITOR))
return REDIS_CLIENT_TYPE_SLAVE; return REDIS_CLIENT_TYPE_SLAVE;
if (dictSize(c->pubsub_channels) || listLength(c->pubsub_patterns)) if (c->flags & REDIS_PUBSUB)
return REDIS_CLIENT_TYPE_PUBSUB; return REDIS_CLIENT_TYPE_PUBSUB;
return REDIS_CLIENT_TYPE_NORMAL; return REDIS_CLIENT_TYPE_NORMAL;
} }
......
...@@ -47,6 +47,12 @@ int listMatchPubsubPattern(void *a, void *b) { ...@@ -47,6 +47,12 @@ int listMatchPubsubPattern(void *a, void *b) {
(equalStringObjects(pa->pattern,pb->pattern)); (equalStringObjects(pa->pattern,pb->pattern));
} }
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(redisClient *c) {
return dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns);
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */ * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(redisClient *c, robj *channel) { int pubsubSubscribeChannel(redisClient *c, robj *channel) {
...@@ -73,7 +79,7 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) { ...@@ -73,7 +79,7 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) {
addReply(c,shared.mbulkhdr[3]); addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk); addReply(c,shared.subscribebulk);
addReplyBulk(c,channel); addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); addReplyLongLong(c,clientSubscriptionsCount(c));
return retval; return retval;
} }
...@@ -135,7 +141,7 @@ int pubsubSubscribePattern(redisClient *c, robj *pattern) { ...@@ -135,7 +141,7 @@ int pubsubSubscribePattern(redisClient *c, robj *pattern) {
addReply(c,shared.mbulkhdr[3]); addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk); addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern); addReplyBulk(c,pattern);
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); addReplyLongLong(c,clientSubscriptionsCount(c));
return retval; return retval;
} }
...@@ -168,7 +174,7 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { ...@@ -168,7 +174,7 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
} }
/* Unsubscribe from all the channels. Return the number of channels the /* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed from. */ * client was subscribed to. */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de; dictEntry *de;
...@@ -273,6 +279,7 @@ void subscribeCommand(redisClient *c) { ...@@ -273,6 +279,7 @@ void subscribeCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]); pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= REDIS_PUBSUB;
} }
void unsubscribeCommand(redisClient *c) { void unsubscribeCommand(redisClient *c) {
...@@ -284,6 +291,7 @@ void unsubscribeCommand(redisClient *c) { ...@@ -284,6 +291,7 @@ void unsubscribeCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1); pubsubUnsubscribeChannel(c,c->argv[j],1);
} }
if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB;
} }
void psubscribeCommand(redisClient *c) { void psubscribeCommand(redisClient *c) {
...@@ -291,6 +299,7 @@ void psubscribeCommand(redisClient *c) { ...@@ -291,6 +299,7 @@ void psubscribeCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]); pubsubSubscribePattern(c,c->argv[j]);
c->flags |= REDIS_PUBSUB;
} }
void punsubscribeCommand(redisClient *c) { void punsubscribeCommand(redisClient *c) {
...@@ -302,6 +311,7 @@ void punsubscribeCommand(redisClient *c) { ...@@ -302,6 +311,7 @@ void punsubscribeCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubUnsubscribePattern(c,c->argv[j],1); pubsubUnsubscribePattern(c,c->argv[j],1);
} }
if (clientSubscriptionsCount(c) == 0) c->flags &= ~REDIS_PUBSUB;
} }
void publishCommand(redisClient *c) { void publishCommand(redisClient *c) {
......
...@@ -856,8 +856,7 @@ int clientsCronHandleTimeout(redisClient *c) { ...@@ -856,8 +856,7 @@ int clientsCronHandleTimeout(redisClient *c) {
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
!(c->flags & REDIS_MASTER) && /* no timeout for masters */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */
!(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ !(c->flags & REDIS_PUBSUB) && /* no timeout for Pub/Sub clients */
listLength(c->pubsub_patterns) == 0 &&
(now - c->lastinteraction > server.maxidletime)) (now - c->lastinteraction > server.maxidletime))
{ {
redisLog(REDIS_VERBOSE,"Closing idle client"); redisLog(REDIS_VERBOSE,"Closing idle client");
...@@ -2078,8 +2077,8 @@ int processCommand(redisClient *c) { ...@@ -2078,8 +2077,8 @@ int processCommand(redisClient *c) {
} }
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) if (c->flags & REDIS_PUBSUB &&
&& c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand && c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand && c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand && c->cmd->proc != psubscribeCommand &&
......
...@@ -239,6 +239,8 @@ typedef long long mstime_t; /* millisecond time type. */ ...@@ -239,6 +239,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define REDIS_FORCE_AOF (1<<14) /* Force AOF propagation of current cmd. */ #define REDIS_FORCE_AOF (1<<14) /* Force AOF propagation of current cmd. */
#define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */ #define REDIS_FORCE_REPL (1<<15) /* Force replication of current cmd. */
#define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */ #define REDIS_PRE_PSYNC (1<<16) /* Instance don't understand PSYNC. */
#define REDIS_READONLY (1<<17) /* Cluster client is in read-only state. */
#define REDIS_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
/* Client request types */ /* Client request types */
#define REDIS_REQ_INLINE 1 #define REDIS_REQ_INLINE 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册