From 6fb1aa2381d321203d1cfceb0d0779ae2f469d8a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 12 Feb 2020 19:22:04 +0100 Subject: [PATCH] Tracking: BCAST: basic feature now works. --- src/networking.c | 2 +- src/server.c | 4 +++ src/tracking.c | 89 +++++++++++++++++++++++++++--------------------- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/src/networking.c b/src/networking.c index 344b76260..46534253e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2224,7 +2224,7 @@ NULL * [PREFIX second] ... */ long long redir = 0; int bcast = 0; - robj **prefix; + robj **prefix = NULL; size_t numprefix = 0; /* Parse the options. */ diff --git a/src/server.c b/src/server.c index 1001fa4f7..22c81070c 100644 --- a/src/server.c +++ b/src/server.c @@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (listLength(server.unblocked_clients)) processUnblockedClients(); + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); diff --git a/src/tracking.c b/src/tracking.c index 345c5f1ad..672b886a3 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -85,6 +85,8 @@ void disableTracking(client *c) { } } raxStop(&ri); + raxFree(c->client_tracking_prefixes); + c->client_tracking_prefixes = NULL; } /* Clear flags and adjust the count. */ @@ -108,6 +110,8 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL); } if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) { + if (c->client_tracking_prefixes == NULL) + c->client_tracking_prefixes = raxNew(); raxInsert(c->client_tracking_prefixes, (unsigned char*)prefix,plen,NULL,NULL); } @@ -121,10 +125,10 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) { * inform it of the condition. Multiple clients can redirect the invalidation * messages to the same client ID. */ void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) { + if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; c->flags |= CLIENT_TRACKING; - c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR; + c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST); c->client_tracking_redirection = redirect_to; - if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++; if (TrackingTable == NULL) { TrackingTable = raxNew(); PrefixTable = raxNew(); @@ -229,10 +233,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) { raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { - if (keylen > ri.key_len) continue; - if (memcmp(ri.key,keyname,ri.key_len) != 0) continue; - bcastState *bs = ri.data; - raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); + if (ri.key_len > keylen) continue; + if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0) + continue; + bcastState *bs = ri.data; + raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL); } raxStop(&ri); } @@ -362,46 +367,52 @@ void trackingLimitUsedSlots(void) { * notifications to each client in each prefix. */ void trackingBroadcastInvalidationMessages(void) { raxIterator ri, ri2; + + /* Return ASAP if there is nothing to do here. */ + if (TrackingTable == NULL || !server.tracking_clients) return; + raxStart(&ri,PrefixTable); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { bcastState *bs = ri.data; - /* Create the array reply with the list of keys once, then send - * it to all the clients subscribed to this prefix. */ - char buf[32]; - size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); - sds proto = sdsempty(); - proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); - proto = sdscatlen(proto,"*",1); - proto = sdscatlen(proto,buf,len); - proto = sdscatlen(proto,"\r\n",2); - raxStart(&ri2,bs->keys); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - len = ll2string(buf,sizeof(buf),ri2.key_len); - sds proto = sdsnewlen("$",1); - proto = sdscatlen(proto,ri2.key,ri2.key_len); + if (raxSize(bs->keys)) { + /* Create the array reply with the list of keys once, then send + * it to all the clients subscribed to this prefix. */ + char buf[32]; + size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys)); + sds proto = sdsempty(); + proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15); + proto = sdscatlen(proto,"*",1); + proto = sdscatlen(proto,buf,len); proto = sdscatlen(proto,"\r\n",2); - } - raxStop(&ri2); - - /* Send this array of keys to every client in the list. */ - raxStart(&ri2,bs->clients); - raxSeek(&ri2,"^",NULL,0); - while(raxNext(&ri2)) { - client *c; - memcpy(&c,ri2.key,sizeof(c)); - sendTrackingMessage(c,proto,sdslen(proto),1); - } - raxStop(&ri2); + raxStart(&ri2,bs->keys); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + len = ll2string(buf,sizeof(buf),ri2.key_len); + proto = sdscatlen(proto,"$",1); + proto = sdscatlen(proto,buf,len); + proto = sdscatlen(proto,"\r\n",2); + proto = sdscatlen(proto,ri2.key,ri2.key_len); + proto = sdscatlen(proto,"\r\n",2); + } + raxStop(&ri2); + + /* Send this array of keys to every client in the list. */ + raxStart(&ri2,bs->clients); + raxSeek(&ri2,"^",NULL,0); + while(raxNext(&ri2)) { + client *c; + memcpy(&c,ri2.key,sizeof(c)); + sendTrackingMessage(c,proto,sdslen(proto),1); + } + raxStop(&ri2); - /* Clean up: we can remove everything from this state, because we - * want to only track the new keys that will be accumulated starting - * from now. */ - sdsfree(proto); - raxFree(bs->clients); + /* Clean up: we can remove everything from this state, because we + * want to only track the new keys that will be accumulated starting + * from now. */ + sdsfree(proto); + } raxFree(bs->keys); - bs->clients = raxNew(); bs->keys = raxNew(); } raxStop(&ri); -- GitLab