Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
04f37846
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
04f37846
编写于
5月 31, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change log level
上级
ed851b52
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
8 addition
and
8 deletion
+8
-8
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+7
-7
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+1
-1
未找到文件。
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
04f37846
...
@@ -1026,10 +1026,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1026,10 +1026,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
if (err != NULL) { \
taosMemoryFree(err); \
taosMemoryFree(err); \
q
Debug
("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
q
Error
("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
code = -1; \
code = -1; \
} else { \
} else { \
q
Debug
("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
q
Trace
("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \
} \
} \
taosMemoryFree(ttlV); \
taosMemoryFree(ttlV); \
} while (0);
} while (0);
...
@@ -1056,10 +1056,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1056,10 +1056,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
if (val == NULL) { \
if (err == NULL) { \
if (err == NULL) { \
q
Debug
("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
q
Trace
("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
funcname); \
funcname); \
} else { \
} else { \
q
Debug
("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
q
Error
("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
err); \
err); \
taosMemoryFreeClear(err); \
taosMemoryFreeClear(err); \
} \
} \
...
@@ -1068,11 +1068,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1068,11 +1068,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* p = NULL; \
char* p = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (len < 0) { \
if (len < 0) { \
q
Debug
("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
q
Error
("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
funcname); \
funcname); \
code = -1; \
code = -1; \
} else { \
} else { \
q
Debug
("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
q
Trace
("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
len); \
len); \
} \
} \
taosMemoryFree(val); \
taosMemoryFree(val); \
...
@@ -1107,7 +1107,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1107,7 +1107,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(err); \
taosMemoryFree(err); \
code = -1; \
code = -1; \
} else { \
} else { \
q
Debug
("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \
q
Trace
("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} \
} \
} while (0);
} while (0);
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
04f37846
...
@@ -157,7 +157,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
...
@@ -157,7 +157,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
tEncodeStreamRetrieveReq
(
&
encoder
,
&
req
);
tEncodeStreamRetrieveReq
(
&
encoder
,
&
req
);
tEncoderClear
(
&
encoder
);
tEncoderClear
(
&
encoder
);
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
msgType
=
TDMT_STREAM_RETRIEVE
,
.
pCont
=
buf
,
.
contLen
=
sizeof
(
SMsgHead
)
+
len
};
SRpcMsg
rpcMsg
=
{
.
code
=
0
,
.
msgType
=
TDMT_STREAM_RETRIEVE
,
.
pCont
=
buf
,
.
contLen
=
sizeof
(
SMsgHead
)
+
len
};
if
(
tmsgSendReq
(
&
pEpInfo
->
epSet
,
&
rpcMsg
)
<
0
)
{
if
(
tmsgSendReq
(
&
pEpInfo
->
epSet
,
&
rpcMsg
)
<
0
)
{
ASSERT
(
0
);
ASSERT
(
0
);
goto
CLEAR
;
goto
CLEAR
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录