diff --git a/src/t_stream.c b/src/t_stream.c index 155167af92d29f9103befdba639c41782c7230af..508c222250cf440a87f4778973a8c983447acc19 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2475,16 +2475,200 @@ void xtrimCommand(client *c) { addReplyLongLong(c,deleted); } +/* Helper function for xinfoCommand. + * Handles the variants of XINFO STREAM */ +void xinfoReplyWithStreamInfo(client *c, stream *s) { + int full = 1; + long long count = 0; + robj **optv = c->argv + 3; /* Options start after XINFO STREAM */ + int optc = c->argc - 3; + + /* Parse options. */ + if (optc == 0) { + full = 0; + } else { + /* Valid options are [FULL] or [FULL COUNT ] */ + if (optc != 1 && optc != 3) { + addReplySubcommandSyntaxError(c); + return; + } + + /* First option must be "FULL" */ + if (strcasecmp(optv[0]->ptr,"full")) { + addReplySubcommandSyntaxError(c); + return; + } + + if (optc == 3) { + /* First option must be "FULL" */ + if (strcasecmp(optv[1]->ptr,"count")) { + addReplySubcommandSyntaxError(c); + return; + } + if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR) + return; + if (count < 0) count = 0; + } + } + + addReplyMapLen(c,full ? 6 : 7); + addReplyBulkCString(c,"length"); + addReplyLongLong(c,s->length); + addReplyBulkCString(c,"radix-tree-keys"); + addReplyLongLong(c,raxSize(s->rax)); + addReplyBulkCString(c,"radix-tree-nodes"); + addReplyLongLong(c,s->rax->numnodes); + addReplyBulkCString(c,"last-generated-id"); + addReplyStreamID(c,&s->last_id); + + if (!full) { + /* XINFO STREAM */ + + addReplyBulkCString(c,"groups"); + addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); + + /* To emit the first/last entry we use streamReplyWithRange(). */ + int emitted; + streamID start, end; + start.ms = start.seq = 0; + end.ms = end.seq = UINT64_MAX; + addReplyBulkCString(c,"first-entry"); + emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReplyNull(c); + addReplyBulkCString(c,"last-entry"); + emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReplyNull(c); + } else { + /* XINFO STREAM FULL [COUNT ] */ + + /* Stream entries */ + addReplyBulkCString(c,"entries"); + streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL); + + /* Consumer groups */ + addReplyBulkCString(c,"groups"); + if (s->cgroups == NULL) { + addReplyArrayLen(c,0); + } else { + addReplyArrayLen(c,raxSize(s->cgroups)); + raxIterator ri_cgroups; + raxStart(&ri_cgroups,s->cgroups); + raxSeek(&ri_cgroups,"^",NULL,0); + while(raxNext(&ri_cgroups)) { + streamCG *cg = ri_cgroups.data; + addReplyMapLen(c,5); + + /* Name */ + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len); + + /* Last delivered ID */ + addReplyBulkCString(c,"last-delivered-id"); + addReplyStreamID(c,&cg->last_id); + + /* Group PEL count */ + addReplyBulkCString(c,"pel-count"); + addReplyLongLong(c,raxSize(cg->pel)); + + /* Group PEL */ + addReplyBulkCString(c,"pending"); + long long arraylen_cg_pel = 0; + void *arrayptr_cg_pel = addReplyDeferredLen(c); + raxIterator ri_cg_pel; + raxStart(&ri_cg_pel,cg->pel); + raxSeek(&ri_cg_pel,"^",NULL,0); + while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) { + streamNACK *nack = ri_cg_pel.data; + addReplyArrayLen(c,4); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri_cg_pel.key,&id); + addReplyStreamID(c,&id); + + /* Consumer name. */ + addReplyBulkCBuffer(c,nack->consumer->name, + sdslen(nack->consumer->name)); + + /* Last delivery. */ + addReplyLongLong(c,nack->delivery_time); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + + arraylen_cg_pel++; + } + setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel); + raxStop(&ri_cg_pel); + + /* Consumers */ + addReplyBulkCString(c,"consumers"); + addReplyArrayLen(c,raxSize(cg->consumers)); + raxIterator ri_consumers; + raxStart(&ri_consumers,cg->consumers); + raxSeek(&ri_consumers,"^",NULL,0); + while(raxNext(&ri_consumers)) { + streamConsumer *consumer = ri_consumers.data; + addReplyMapLen(c,4); + + /* Consumer name */ + addReplyBulkCString(c,"name"); + addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); + + /* Seen-time */ + addReplyBulkCString(c,"seen-time"); + addReplyLongLong(c,consumer->seen_time); + + /* Consumer PEL count */ + addReplyBulkCString(c,"pel-count"); + addReplyLongLong(c,raxSize(consumer->pel)); + + /* Consumer PEL */ + addReplyBulkCString(c,"pending"); + long long arraylen_cpel = 0; + void *arrayptr_cpel = addReplyDeferredLen(c); + raxIterator ri_cpel; + raxStart(&ri_cpel,consumer->pel); + raxSeek(&ri_cpel,"^",NULL,0); + while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) { + streamNACK *nack = ri_cpel.data; + addReplyArrayLen(c,3); + + /* Entry ID. */ + streamID id; + streamDecodeID(ri_cpel.key,&id); + addReplyStreamID(c,&id); + + /* Last delivery. */ + addReplyLongLong(c,nack->delivery_time); + + /* Number of deliveries. */ + addReplyLongLong(c,nack->delivery_count); + + arraylen_cpel++; + } + setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel); + raxStop(&ri_cpel); + } + raxStop(&ri_consumers); + } + raxStop(&ri_cgroups); + } + } +} + /* XINFO CONSUMERS * XINFO GROUPS - * XINFO STREAM + * XINFO STREAM [FULL [COUNT ]] * XINFO HELP. */ void xinfoCommand(client *c) { const char *help[] = { -"CONSUMERS -- Show consumer groups of group .", -"GROUPS -- Show the stream consumer groups.", -"STREAM -- Show information about the stream.", -"HELP -- Print this help.", +"CONSUMERS -- Show consumer groups of group .", +"GROUPS -- Show the stream consumer groups.", +"STREAM [FULL [COUNT ]] -- Show information about the stream.", +"HELP -- Print this help.", NULL }; stream *s = NULL; @@ -2564,36 +2748,10 @@ NULL addReplyStreamID(c,&cg->last_id); } raxStop(&ri); - } else if (!strcasecmp(opt,"STREAM") && c->argc == 3) { - /* XINFO STREAM (or the alias XINFO ). */ - addReplyMapLen(c,7); - addReplyBulkCString(c,"length"); - addReplyLongLong(c,s->length); - addReplyBulkCString(c,"radix-tree-keys"); - addReplyLongLong(c,raxSize(s->rax)); - addReplyBulkCString(c,"radix-tree-nodes"); - addReplyLongLong(c,s->rax->numnodes); - addReplyBulkCString(c,"groups"); - addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0); - addReplyBulkCString(c,"last-generated-id"); - addReplyStreamID(c,&s->last_id); - - /* To emit the first/last entry we us the streamReplyWithRange() - * API. */ - int count; - streamID start, end; - start.ms = start.seq = 0; - end.ms = end.seq = UINT64_MAX; - addReplyBulkCString(c,"first-entry"); - count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); - if (!count) addReplyNull(c); - addReplyBulkCString(c,"last-entry"); - count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); - if (!count) addReplyNull(c); + } else if (!strcasecmp(opt,"STREAM")) { + /* XINFO STREAM [FULL [COUNT ]]. */ + xinfoReplyWithStreamInfo(c,s); } else { addReplySubcommandSyntaxError(c); } } - diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 04661707bc3c5470dd69605207455ae4e06f053d..dfcd735f6d742529585028df9b80b8aeb16e99b9 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -294,6 +294,40 @@ start_server { assert {[lindex $reply 0 3] == 2} } + test {XINFO FULL output} { + r del x + r XADD x 100 a 1 + r XADD x 101 b 1 + r XADD x 102 c 1 + r XADD x 103 e 1 + r XADD x 104 f 1 + r XGROUP CREATE x g1 0 + r XGROUP CREATE x g2 0 + r XREADGROUP GROUP g1 Alice COUNT 1 STREAMS x > + r XREADGROUP GROUP g1 Bob COUNT 1 STREAMS x > + r XREADGROUP GROUP g1 Bob NOACK COUNT 1 STREAMS x > + r XREADGROUP GROUP g2 Charlie COUNT 4 STREAMS x > + r XDEL x 103 + + set reply [r XINFO STREAM x FULL] + assert_equal [llength $reply] 12 + assert_equal [lindex $reply 1] 4 ;# stream length + assert_equal [lindex $reply 9] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" ;# entries + assert_equal [lindex $reply 11 0 1] "g1" ;# first group name + assert_equal [lindex $reply 11 0 7 0 0] "100-0" ;# first entry in group's PEL + assert_equal [lindex $reply 11 0 9 0 1] "Alice" ;# first consumer + assert_equal [lindex $reply 11 0 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL + assert_equal [lindex $reply 11 1 1] "g2" ;# second group name + assert_equal [lindex $reply 11 1 9 0 1] "Charlie" ;# first consumer + assert_equal [lindex $reply 11 1 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL + assert_equal [lindex $reply 11 1 9 0 7 1 0] "101-0" ;# second entry in first consumer's PEL + + set reply [r XINFO STREAM x FULL COUNT 1] + assert_equal [llength $reply] 12 + assert_equal [lindex $reply 1] 4 + assert_equal [lindex $reply 9] "{100-0 {a 1}}" + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host]