提交 19b06935 编写于 作者: A antirez

Streams: fix XADD API and keyspace notifications.

XADD was suboptimal in the first incarnation of the command, not being
able to accept an ID (very useufl for replication), nor options for
having capped streams.

The keyspace notification for streams was not implemented.
上级 db89f747
......@@ -54,6 +54,7 @@ int keyspaceEventsStringToFlags(char *classes) {
case 'e': flags |= NOTIFY_EVICTED; break;
case 'K': flags |= NOTIFY_KEYSPACE; break;
case 'E': flags |= NOTIFY_KEYEVENT; break;
case 't': flags |= NOTIFY_STREAM; break;
default: return -1;
}
}
......@@ -79,6 +80,7 @@ sds keyspaceEventsFlagsToString(int flags) {
if (flags & NOTIFY_ZSET) res = sdscatlen(res,"z",1);
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
}
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
......
......@@ -302,7 +302,7 @@ struct redisCommand redisCommandTable[] = {
{"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
{"xadd",xaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
{"xadd",xaddCommand,-5,"wmF",0,NULL,1,1,1,0,0},
{"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"xread",xreadCommand,-3,"rs",0,xreadGetKeys,1,1,1,0,0},
......
......@@ -427,7 +427,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define NOTIFY_ZSET (1<<7) /* z */
#define NOTIFY_EXPIRED (1<<8) /* x */
#define NOTIFY_EVICTED (1<<9) /* e */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED) /* A */
#define NOTIFY_STREAM (1<<10) /* t */
#define NOTIFY_ALL (NOTIFY_GENERIC | NOTIFY_STRING | NOTIFY_LIST | NOTIFY_SET | NOTIFY_HASH | NOTIFY_ZSET | NOTIFY_EXPIRED | NOTIFY_EVICTED | NOTIFY_STREAM) /* A flag */
/* Get the first bind addr or NULL */
#define NET_FIRST_BIND_ADDR (server.bindaddr_count ? server.bindaddr[0] : NULL)
......
......@@ -115,8 +115,24 @@ void streamDecodeID(void *buf, streamID *id) {
/* Adds a new item into the stream 's' having the specified number of
* field-value pairs as specified in 'numfields' and stored into 'argv'.
* Returns the new entry ID populating the 'added_id' structure. */
void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) {
* Returns the new entry ID populating the 'added_id' structure.
*
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
* but instead the passed ID is uesd to add the new entry. In this case
* adding the entry may fail as specified later in this comment.
*
* The function returns C_OK if the item was added, this is always true
* if the ID was generated by the function. However the function may return
* C_ERR if an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. */
int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, streamID *use_id) {
/* If an ID was given, check that it's greater than the last entry ID
* or return an error. */
if (use_id && (use_id->ms < s->last_id.ms ||
(use_id->ms == s->last_id.ms &&
use_id->seq <= s->last_id.seq))) return C_ERR;
/* Add the new entry. */
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"$",NULL,0);
......@@ -133,7 +149,10 @@ void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id)
/* Generate the new entry ID. */
streamID id;
streamNextID(&s->last_id,&id);
if (use_id)
id = *use_id;
else
streamNextID(&s->last_id,&id);
/* We have to add the key into the radix tree in lexicographic order,
* to do so we consider the ID as a single 128 bit number written in
......@@ -173,6 +192,7 @@ void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id)
s->length++;
s->last_id = id;
if (added_id) *added_id = id;
return C_OK;
}
/* Send the specified range to the client 'c'. The range the client will
......@@ -299,7 +319,9 @@ int string2ull(const char *s, unsigned long long *value) {
* form, just stating the milliseconds time part of the stream. In such a case
* the missing part is set according to the value of 'missing_seq' parameter.
* The IDs "-" and "+" specify respectively the minimum and maximum IDs
* that can be represented. */
* that can be represented.
*
* If 'c' is set to NULL, no reply is sent to the client. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
char buf[128];
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
......@@ -328,13 +350,45 @@ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq)
return C_OK;
invalid:
addReplyError(c,"Invalid stream ID specified as stream command argument");
if (c) addReplyError(c,"Invalid stream ID specified as stream "
"command argument");
return C_ERR;
}
/* XADD key [field value] [field value] ... */
/* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
if ((c->argc % 2) == 1) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
/* Parse options. */
int i = 2; /* This is the first argument position where we could
find an option, or the ID. */
for (; i < c->argc; i++) {
int moreargs = i != c->argc-1;
char *opt = c->argv[i]->ptr;
if (opt[0] == '*' && opt[1] == '\0') {
/* This is just a fast path for the common case of auto-ID
* creation. */
break;
} else if (!strcasecmp(opt,"maxlen") && moreargs) {
addReplyError(c,"Sorry, MAXLEN is still not implemented");
i++;
return;
} else {
/* If we are here is a syntax error or a valid ID. */
if (streamParseIDOrReply(NULL,c->argv[i],&id,0) == C_OK) {
id_given = 1;
break;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
}
int field_pos = i+1;
/* Check arity. */
if ((c->argc - field_pos) < 2 || (c->argc-field_pos % 2) == 1) {
addReplyError(c,"wrong number of arguments for XADD");
return;
}
......@@ -346,13 +400,19 @@ void xaddCommand(client *c) {
s = o->ptr;
/* Append using the low level function and return the ID. */
streamID id;
streamAppendItem(s,c->argv+2,(c->argc-2)/2,&id);
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, id_given ? &id : NULL)
== C_ERR)
{
addReplyError(c,"The ID specified in XADD is smaller than the "
"target stream top item");
return;
}
sds reply = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq);
addReplySds(c,reply);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"xadd",c->argv[1],c->db->id);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
if (server.blocked_clients_by_type[BLOCKED_STREAM])
signalKeyAsReady(c->db, c->argv[1]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册