提交 6fb1aa23 编写于 作者: A antirez

Tracking: BCAST: basic feature now works.

上级 d4fe79a1
......@@ -2224,7 +2224,7 @@ NULL
* [PREFIX second] ... */
long long redir = 0;
int bcast = 0;
robj **prefix;
robj **prefix = NULL;
size_t numprefix = 0;
/* Parse the options. */
......
......@@ -2124,6 +2124,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (listLength(server.unblocked_clients))
processUnblockedClients();
/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
......
......@@ -85,6 +85,8 @@ void disableTracking(client *c) {
}
}
raxStop(&ri);
raxFree(c->client_tracking_prefixes);
c->client_tracking_prefixes = NULL;
}
/* Clear flags and adjust the count. */
......@@ -108,6 +110,8 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
raxInsert(PrefixTable,(unsigned char*)prefix,plen,bs,NULL);
}
if (raxTryInsert(bs->clients,(unsigned char*)&c,sizeof(c),NULL,NULL)) {
if (c->client_tracking_prefixes == NULL)
c->client_tracking_prefixes = raxNew();
raxInsert(c->client_tracking_prefixes,
(unsigned char*)prefix,plen,NULL,NULL);
}
......@@ -121,10 +125,10 @@ void enableBcastTrackingForPrefix(client *c, char *prefix, size_t plen) {
* inform it of the condition. Multiple clients can redirect the invalidation
* messages to the same client ID. */
void enableTracking(client *c, uint64_t redirect_to, int bcast, robj **prefix, size_t numprefix) {
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
c->flags |= CLIENT_TRACKING;
c->flags &= ~CLIENT_TRACKING_BROKEN_REDIR;
c->flags &= ~(CLIENT_TRACKING_BROKEN_REDIR|CLIENT_TRACKING_BCAST);
c->client_tracking_redirection = redirect_to;
if (!(c->flags & CLIENT_TRACKING)) server.tracking_clients++;
if (TrackingTable == NULL) {
TrackingTable = raxNew();
PrefixTable = raxNew();
......@@ -229,10 +233,11 @@ void trackingRememberKeyToBroadcast(char *keyname, size_t keylen) {
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
if (keylen > ri.key_len) continue;
if (memcmp(ri.key,keyname,ri.key_len) != 0) continue;
bcastState *bs = ri.data;
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
if (ri.key_len > keylen) continue;
if (ri.key_len != 0 && memcmp(ri.key,keyname,ri.key_len) != 0)
continue;
bcastState *bs = ri.data;
raxTryInsert(bs->keys,(unsigned char*)keyname,keylen,NULL,NULL);
}
raxStop(&ri);
}
......@@ -362,46 +367,52 @@ void trackingLimitUsedSlots(void) {
* notifications to each client in each prefix. */
void trackingBroadcastInvalidationMessages(void) {
raxIterator ri, ri2;
/* Return ASAP if there is nothing to do here. */
if (TrackingTable == NULL || !server.tracking_clients) return;
raxStart(&ri,PrefixTable);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
bcastState *bs = ri.data;
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
sds proto = sdsempty();
proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
proto = sdscatlen(proto,"*",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
raxStart(&ri2,bs->keys);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
len = ll2string(buf,sizeof(buf),ri2.key_len);
sds proto = sdsnewlen("$",1);
proto = sdscatlen(proto,ri2.key,ri2.key_len);
if (raxSize(bs->keys)) {
/* Create the array reply with the list of keys once, then send
* it to all the clients subscribed to this prefix. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),raxSize(bs->keys));
sds proto = sdsempty();
proto = sdsMakeRoomFor(proto,raxSize(bs->keys)*15);
proto = sdscatlen(proto,"*",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
}
raxStop(&ri2);
/* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
client *c;
memcpy(&c,ri2.key,sizeof(c));
sendTrackingMessage(c,proto,sdslen(proto),1);
}
raxStop(&ri2);
raxStart(&ri2,bs->keys);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
len = ll2string(buf,sizeof(buf),ri2.key_len);
proto = sdscatlen(proto,"$",1);
proto = sdscatlen(proto,buf,len);
proto = sdscatlen(proto,"\r\n",2);
proto = sdscatlen(proto,ri2.key,ri2.key_len);
proto = sdscatlen(proto,"\r\n",2);
}
raxStop(&ri2);
/* Send this array of keys to every client in the list. */
raxStart(&ri2,bs->clients);
raxSeek(&ri2,"^",NULL,0);
while(raxNext(&ri2)) {
client *c;
memcpy(&c,ri2.key,sizeof(c));
sendTrackingMessage(c,proto,sdslen(proto),1);
}
raxStop(&ri2);
/* Clean up: we can remove everything from this state, because we
* want to only track the new keys that will be accumulated starting
* from now. */
sdsfree(proto);
raxFree(bs->clients);
/* Clean up: we can remove everything from this state, because we
* want to only track the new keys that will be accumulated starting
* from now. */
sdsfree(proto);
}
raxFree(bs->keys);
bs->clients = raxNew();
bs->keys = raxNew();
}
raxStop(&ri);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册