未验证 提交 f1f259de 编写于 作者: S Salvatore Sanfilippo 提交者: GitHub

Merge pull request #6547 from guybe7/module_api_streams

Support streams in general module API functions
......@@ -518,7 +518,8 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) {
case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
case OBJ_SET: isempty = setTypeSize(o) == 0; break;
case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
case OBJ_HASH : isempty = hashTypeLength(o) == 0; break;
case OBJ_HASH: isempty = hashTypeLength(o) == 0; break;
case OBJ_STREAM: isempty = streamLength(o) == 0; break;
default: isempty = 0;
}
......@@ -1965,6 +1966,7 @@ int RM_KeyType(RedisModuleKey *key) {
case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM;
default: return 0;
}
}
......@@ -1982,6 +1984,7 @@ size_t RM_ValueLength(RedisModuleKey *key) {
case OBJ_SET: return setTypeSize(key->value);
case OBJ_ZSET: return zsetLength(key->value);
case OBJ_HASH: return hashTypeLength(key->value);
case OBJ_STREAM: return streamLength(key->value);
default: return 0;
}
}
......
......@@ -33,6 +33,7 @@
#define REDISMODULE_KEYTYPE_SET 4
#define REDISMODULE_KEYTYPE_ZSET 5
#define REDISMODULE_KEYTYPE_MODULE 6
#define REDISMODULE_KEYTYPE_STREAM 7
/* Reply types. */
#define REDISMODULE_REPLY_UNKNOWN -1
......
......@@ -98,6 +98,7 @@ struct client;
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
......
......@@ -67,6 +67,12 @@ void freeStream(stream *s) {
zfree(s);
}
/* Return the length of a stream. */
unsigned long streamLength(const robj *subject) {
stream *s = subject->ptr;
return s->length;
}
/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册