Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Turbo码先生
redis
提交
fa61720d
R
redis
项目概览
Turbo码先生
/
redis
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
redis
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
fa61720d
编写于
9月 07, 2017
作者:
A
antirez
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Streams: XREAD, first draft. Handling of blocked clients still missing.
上级
e65b4825
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
46 addition
and
10 deletion
+46
-10
src/t_stream.c
src/t_stream.c
+46
-10
未找到文件。
src/t_stream.c
浏览文件 @
fa61720d
...
...
@@ -398,10 +398,10 @@ void xlenCommand(client *c) {
* [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ...
* key_N ID_N */
void
xreadCommand
(
client
*
c
)
{
long
long
block
=
0
;
long
long
timeout
=
0
;
long
long
count
=
0
;
int
streams_count
=
0
;
int
streams_arg
c
=
0
;
int
streams_arg
=
0
;
#define STREAMID_STATIC_VECTOR_LEN 8
streamID
static_ids
[
STREAMID_STATIC_VECTOR_LEN
];
streamID
*
ids
=
static_ids
;
...
...
@@ -412,17 +412,17 @@ void xreadCommand(client *c) {
char
*
o
=
c
->
argv
[
i
]
->
ptr
;
if
(
!
strcasecmp
(
o
,
"BLOCK"
)
&&
moreargs
)
{
i
++
;
if
(
getLongLongFromObjectOrReply
(
c
,
c
->
argv
[
i
],
&
block
,
NULL
)
!=
C_OK
)
return
;
if
(
block
<
0
)
block
=
0
;
if
(
getLongLongFromObjectOrReply
(
c
,
c
->
argv
[
i
],
&
timeout
,
NULL
)
!=
C_OK
)
return
;
if
(
timeout
<
0
)
timeout
=
0
;
}
else
if
(
!
strcasecmp
(
o
,
"COUNT"
)
&&
moreargs
)
{
i
++
;
if
(
getLongLongFromObjectOrReply
(
c
,
c
->
argv
[
i
],
&
count
,
NULL
)
!=
C_OK
)
return
;
if
(
count
<
0
)
count
=
0
;
}
else
if
(
!
strcasecmp
(
o
,
"STREAMS"
)
&&
moreargs
)
{
streams_arg
c
=
i
+
1
;
streams_count
=
(
c
->
argc
-
streams_arg
c
);
streams_arg
=
i
+
1
;
streams_count
=
(
c
->
argc
-
streams_arg
);
if
((
streams_count
%
2
)
!=
0
)
{
addReplyError
(
c
,
"Unbalanced XREAD list of streams: "
"for each stream key an ID or '$' must be "
...
...
@@ -438,7 +438,7 @@ void xreadCommand(client *c) {
}
/* STREAMS option is mandatory. */
if
(
streams_arg
c
==
0
)
{
if
(
streams_arg
==
0
)
{
addReply
(
c
,
shared
.
syntaxerr
);
return
;
}
...
...
@@ -447,8 +447,7 @@ void xreadCommand(client *c) {
if
(
streams_count
>
STREAMID_STATIC_VECTOR_LEN
)
ids
=
zmalloc
(
sizeof
(
streamID
)
*
streams_count
);
/* Try to serve the client synchronously. */
for
(
int
i
=
streams_argc
+
streams_count
;
i
<
c
->
argc
;
i
++
)
{
for
(
int
i
=
streams_arg
+
streams_count
;
i
<
c
->
argc
;
i
++
)
{
/* Specifying "$" as last-known-id means that the client wants to be
* served with just the messages that will arrive into the stream
* starting from now. */
...
...
@@ -466,6 +465,43 @@ void xreadCommand(client *c) {
if
(
streamParseIDOrReply
(
c
,
c
->
argv
[
i
],
ids
+
i
,
0
)
!=
C_OK
)
goto
cleanup
;
}
/* Try to serve the client synchronously. */
for
(
int
i
=
0
;
i
<
streams_count
;
i
++
)
{
robj
*
o
=
lookupKeyRead
(
c
->
db
,
c
->
argv
[
i
+
streams_arg
]);
if
(
o
==
NULL
)
continue
;
stream
*
s
=
o
->
ptr
;
streamID
*
gt
=
ids
+
i
;
/* ID must be greater than this. */
if
(
s
->
last_id
.
ms
>
gt
->
ms
||
(
s
->
last_id
.
ms
==
gt
->
ms
&&
s
->
last_id
.
seq
>
gt
->
seq
))
{
/* streamReplyWithRange() handles the 'start' ID as inclusive,
* so start from the next ID, since we want only messages with
* IDs greater than start. */
streamID
start
=
*
gt
;
start
.
seq
++
;
/* Can't overflow, it's an uint64_t */
streamReplyWithRange
(
c
,
s
,
&
start
,
NULL
,
count
);
goto
cleanup
;
}
}
/* Block if needed. */
if
(
timeout
)
{
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if
(
c
->
flags
&
CLIENT_MULTI
)
{
addReply
(
c
,
shared
.
nullmultibulk
);
goto
cleanup
;
}
blockForKeys
(
c
,
BLOCKED_STREAM
,
c
->
argv
+
streams_arg
,
streams_count
,
timeout
,
NULL
,
ids
);
goto
cleanup
;
}
/* No BLOCK option, nor any stream we can serve. Reply as with a
* timeout happened. */
addReply
(
c
,
shared
.
nullmultibulk
);
/* Continue to cleanup... */
cleanup:
/* Cleanup. */
if
(
ids
!=
static_ids
)
zfree
(
ids
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录