From abb81c635143dc329eefb5729acec0a62d53cab1 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 11 Feb 2020 17:26:27 +0100 Subject: [PATCH] Tracking: BCAST: registration in the prefix table. --- src/networking.c | 21 +++++++++--------- src/server.h | 9 +++----- src/tracking.c | 57 +++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/src/networking.c b/src/networking.c index 690a134f1..344b76260 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,7 +154,7 @@ client *createClient(connection *conn) { c->peerid = NULL; c->client_list_node = NULL; c->client_tracking_redirection = 0; - c->client_tracking_prefix_nodes = NULL; + c->client_tracking_prefixes = NULL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; @@ -2028,7 +2028,6 @@ int clientSetNameOrReply(client *c, robj *name) { void clientCommand(client *c) { listNode *ln; listIter li; - client *client; if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { @@ -2142,7 +2141,7 @@ NULL /* Iterate clients killing all the matching clients. */ listRewind(server.clients,&li); while ((ln = listNext(&li)) != NULL) { - client = listNodeValue(ln); + client *client = listNodeValue(ln); if (addr && strcmp(getClientPeerId(client),addr) != 0) continue; if (type != -1 && getClientType(client) != type) continue; if (id != 0 && client->id != id) continue; @@ -2229,7 +2228,7 @@ NULL size_t numprefix = 0; /* Parse the options. */ - if (for int j = 3; j < argc; j++) { + for (int j = 3; j < c->argc; j++) { int moreargs = (c->argc-1) - j; if (!strcasecmp(c->argv[j]->ptr,"redirect") && moreargs) { @@ -2246,10 +2245,10 @@ NULL } } else if (!strcasecmp(c->argv[j]->ptr,"bcast")) { bcast++; - } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && morearg) { + } else if (!strcasecmp(c->argv[j]->ptr,"prefix") && moreargs) { j++; - prefix = zrealloc(sizeof(robj*)*(numprefix+1)); - prefix[numprefix++] = argv[j]; + prefix = zrealloc(prefix,sizeof(robj*)*(numprefix+1)); + prefix[numprefix++] = c->argv[j]; } else { addReply(c,shared.syntaxerr); return; @@ -2259,16 +2258,16 @@ NULL /* Make sure options are compatible among each other and with the * current state of the client. */ if (!bcast && numprefix) { - addReplyError("PREFIX option requires BCAST mode to be enabled"); + addReplyError(c,"PREFIX option requires BCAST mode to be enabled"); zfree(prefix); return; } - if (client->flags & CLIENT_TRACKING) { - int oldbcast = !!client->flags & CLIENT_TRACKING_BCAST; + if (c->flags & CLIENT_TRACKING) { + int oldbcast = !!c->flags & CLIENT_TRACKING_BCAST; if (oldbcast != bcast) { } - addReplyError( + addReplyError(c, "You can't switch BCAST mode on/off before disabling " "tracking for this client, and then re-enabling it with " "a different mode."); diff --git a/src/server.h b/src/server.h index d3ca0d01b..725c3cbc8 100644 --- a/src/server.h +++ b/src/server.h @@ -823,12 +823,9 @@ typedef struct client { * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; - list *client_tracking_prefix_nodes; /* This list contains listNode pointers - to the nodes we have in every list - of clients in the tracking bcast - table. This way we can remove our - client in O(1) for each list. */ - + rax *client_tracking_prefixes; /* A dictionary of prefixes we are already + subscribed to in BCAST mode, in the + context of client side caching. */ /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; diff --git a/src/tracking.c b/src/tracking.c index 413b21328..9f46275a4 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -49,6 +49,15 @@ uint64_t TrackingTableTotalItems = 0; /* Total number of IDs stored across are using server side for CSC. */ robj *TrackingChannelName; +/* This is the structure that we have as value of the PrefixTable, and + * represents the list of keys modified, and the list of clients that need + * to be notified, for a given prefix. */ +typedef struct bcastState { + rax *keys; /* Keys modified in the current event loop cycle. */ + rax *clients; /* Clients subscribed to the notification events for this + prefix. */ +} bcastState; + /* Remove the tracking state from the client 'c'. Note that there is not much * to do for us here, if not to decrement the counter of the clients in * tracking mode, because we just store the ID of the client in the tracking @@ -56,9 +65,51 @@ robj *TrackingChannelName; * client with many entries in the table is removed, it would cost a lot of * time to do the cleanup. */ void disableTracking(client *c) { + /* If this client is in broadcasting mode, we need to unsubscribe it + * from all the prefixes it is registered to. */ + if (c->flags & CLIENT_TRACKING_BCAST) { + raxIterator ri; + raxStart(&ri,c->client_tracking_prefixes); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + bcastState *bs = raxFind(PrefixTable,ri.key,ri.key_len); + serverAssert(bs != raxNotFound); + raxRemove(bs->clients,(unsigned char*)&c,sizeof(c),NULL); + /* Was it the last client? Remove the prefix from the + * table. */ + if (raxSize(bs->clients) == 0) { + raxFree(bs->clients); + raxFree(bs->keys); + zfree(bs); + raxRemove(PrefixTable,ri.key,ri.key_len,NULL); + } + } + raxStop(&ri); + } + + /* Clear flags and adjust the count. */ if (c->flags & CLIENT_TRACKING) { server.tracking_clients--; - c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR); + c->flags &= ~(CLIENT_TRACKING|CLIENT_TRACKING_BROKEN_REDIR| + CLIENT_TRACKING_BCAST); + } +} + +/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is + * already registered for the specified prefix, no operation is performed. */ +void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { + bcastState *bs = raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); + /* If this is the first client subscribing to such prefix, create + * the prefix in the table. */ + if (bs == raxNotFound) { + bs = zmalloc(sizeof(*bs)); + bs->keys = raxNew(); + bs->clients = raxNew(); + raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); + } + if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + raxInsert(c->client_tracking_prefixes, + (unsigned char*)prefix,plen,NULL,NULL); } } @@ -83,9 +134,9 @@ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, s if (bcast) { c->flags |= CLIENT_TRACKING_BCAST; if (numprefix == 0) enableBcastTrackingForPrefix(c,"",0); - for (int j = 0; j < numprefix; j++) { + for (size_t j = 0; j < numprefix; j++) { sds sdsprefix = prefix[j]->ptr; - enableBcastTrackingForPrefix(c,sdsprefix,sdslen(prefix)); + enableBcastTrackingForPrefix(c,sdsprefix,sdslen(sdsprefix)); } } } -- GitLab