diff --git a/src/t_stream.c b/src/t_stream.c index a8230109cf4ffbcc73e301eb3c177cdf848a29db..a1d3f8a17a8a4a8231cecfcb8cbd79f5ad751518 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -196,6 +196,120 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, return C_OK; } +/* We define an iterator to iterate stream items in an abstract way, without + * caring about the radix tree + listpack representation. Technically speaking + * the iterator is only used inside streamReplyWithRange(), so could just + * be implemented inside the function, but practically there is the AOF + * rewriting code that also needs to iterate the stream to emit the XADD + * commands. */ +typedef struct streamIterator { + uint64_t start_key[2]; /* Start key as 128 bit big endian. */ + uint64_t end_key[2]; /* End key as 128 bit big endian. */ + raxIterator ri; /* Rax iterator. */ + unsigned char *lp; /* Current listpack. */ + unsigned char *lp_ele; /* Current listpack cursor. */ +} streamIterator; + +/* Initialize the stream iterator, so that we can call iterating functions + * to get the next items. This requires a corresponding streamIteratorStop() + * at the end. + * + * Once the iterator is initalized, we iterate like this: + * + * streamIterator myiterator; + * streamIteratorStart(&myiterator,...); + * size_t numfields; + * while(streamIteratorGetID(&myitereator,&ID,&numfields)) { + * while(numfields--) { + * unsigned char *key, *value; + * size_t key_len, value_len; + * streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len); + * + * ... do what you want with key and value ... + * } + * } + * streamIteratorStop(&myiterator); */ +void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) { + /* Intialize the iterator and translates the iteration start/stop + * elements into a 128 big big-endian number. */ + streamEncodeID(si->start_key,start); + if (end) { + streamEncodeID(si->end_key,end); + } else { + /* We assume that UINT64_MAX is the same in little and big + * endian, that is, all bits set. */ + si->end_key[0] = UINT64_MAX; + si->end_key[0] = UINT64_MAX; + } + raxStart(&si->ri,s->rax); + + /* Seek the correct node in the radix tree. */ + if (start->ms || start->seq) { + raxSeek(&si->ri,"<=",(unsigned char*)si->start_key, + sizeof(si->start_key)); + if (raxEOF(&si->ri)) + raxSeek(&si->ri,">",(unsigned char*)si->start_key, + sizeof(si->start_key)); + } else { + raxSeek(&si->ri,"^",NULL,0); + } + si->lp = NULL; /* There is no current listpack right now. */ + si->lp_ele = NULL; /* Current listpack cursor. */ +} + +/* Return 1 and store the current item ID at 'id' if there are still + * elements within the iteration range, otherwise return 0 in order to + * signal the iteration terminated. */ +int streamIteratorGetID(streamIterator *si, streamID *id, size_t *numfields) { + while(1) { /* Will stop when element > stop_key or end of radix tree. */ + /* If the current listpack is set to NULL, this is the start of the + * iteration or the previous listpack was completely iterated. + * Go to the next node. */ + if (si->lp == NULL || si->lp_ele == NULL) { + if (!raxNext(&si->ri)) return 0; + serverAssert(si->ri.key_len == sizeof(streamID)); + si->lp = si->ri.data; + si->lp_ele = lpFirst(si->lp); + } + + /* For every radix tree node, iterate the corresponding listpack, + * returning elements when they are within range. */ + while(si->lp_ele) { + int64_t e_len; + unsigned char buf[LP_INTBUF_SIZE]; + unsigned char *e = lpGet(si->lp_ele,&e_len,buf); + serverAssert(e_len == sizeof(streamID)); + + /* Go to next field: number of elements. */ + si->lp_ele = lpNext(si->lp,si->lp_ele); + + /* If current >= start */ + if (memcmp(e,si->start_key,sizeof(streamID)) >= 0) { + if (memcmp(e,si->end_key,sizeof(streamID)) > 0) + return 0; /* We are already out of range. */ + streamDecodeID(e,id); + *numfields = lpGetInteger(si->lp_ele); + return 1; /* Valid item returned. */ + } else { + /* If we do not emit, we have to discard. */ + int64_t numfields = lpGetInteger(si->lp_ele); + si->lp_ele = lpNext(si->lp,si->lp_ele); + for (int64_t i = 0; i < numfields*2; i++) + si->lp_ele = lpNext(si->lp,si->lp_ele); + } + } + + /* End of listpack reached. Try the next radix tree node. */ + } +} + +/* Stop the stream iterator. The only cleanup we need is to free the rax + * itereator, since the stream iterator itself is supposed to be stack + * allocated. */ +void streamIteratorStop(streamIterator *si) { + raxStop(&si->ri); +} + /* Send the specified range to the client 'c'. The range the client will * receive is between start and end inclusive, if 'count' is non zero, no more * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that