Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1d58f85a
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
1d58f85a
编写于
2月 21, 2023
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat:coverity scan
上级
8a925342
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
26 addition
and
7 deletion
+26
-7
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-2
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+5
-1
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+2
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+8
-3
source/libs/stream/src/streamCheckpoint.c
source/libs/stream/src/streamCheckpoint.c
+2
-0
source/libs/stream/src/streamData.c
source/libs/stream/src/streamData.c
+1
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+2
-0
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+2
-0
source/libs/stream/src/streamQueue.c
source/libs/stream/src/streamQueue.c
+2
-0
未找到文件。
include/common/tmsgdef.h
浏览文件 @
1d58f85a
...
...
@@ -172,8 +172,8 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_SERVER_VERSION
,
"server-version"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_UPTIME_TIMER
,
"uptime-timer"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_TMQ_LOST_CONSUMER_CLEAR
,
"lost-consumer-clear"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_STREAM_CHECKPOINT_TIMER
,
"stream-checkpoint-tmr"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_STREAM_BEGIN_CHECKPOINT
,
"stream-begin-checkpoint"
,
NULL
,
NULL
)
//
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
//
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE
(
TDMT_MND_MAX_MSG
,
"mnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
...
...
include/libs/stream/tstream.h
浏览文件 @
1d58f85a
...
...
@@ -175,20 +175,24 @@ typedef struct {
void
streamFreeQitem
(
SStreamQueueItem
*
data
);
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes);
int64_t streamQueueResSize(const SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes);
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes);
void streamQueueResClear(SStreamQueueRes* pRes);
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pNode);
#endif
typedef
struct
{
SStreamQueueNode
*
pHead
;
}
SStreamQueue1
;
#if 0
bool streamQueueHasTask(const SStreamQueue1* pQueue);
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
#endif
typedef
struct
{
STaosQueue
*
queue
;
...
...
@@ -636,7 +640,7 @@ void streamMetaClose(SStreamMeta* streamMeta);
int32_t
streamMetaSaveTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
int32_t
streamMetaAddTask
(
SStreamMeta
*
pMeta
,
int64_t
ver
,
SStreamTask
*
pTask
);
int32_t
streamMetaAddSerializedTask
(
SStreamMeta
*
pMeta
,
int64_t
startVer
,
char
*
msg
,
int32_t
msgLen
);
SStreamTask
*
streamMetaGetTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
//
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask
*
streamMetaAcquireTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
);
void
streamMetaReleaseTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
);
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
1d58f85a
...
...
@@ -133,6 +133,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
}
}
#if 0
static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
int32_t contLen = 0;
void *pReq = mndBuildCheckpointTickMsg(&contLen, sec);
...
...
@@ -145,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
#endif
static
void
mndPullupTelem
(
SMnode
*
pMnode
)
{
mTrace
(
"pullup telem msg"
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
1d58f85a
...
...
@@ -39,7 +39,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj
static
int32_t
mndProcessCreateStreamReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropStreamReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessStreamCheckpointTmr
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessStreamDoCheckpoint
(
SRpcMsg
*
pReq
);
//
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/
static
int32_t
mndProcessStreamMetaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndGetStreamMeta
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
...
...
@@ -66,8 +66,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DEPLOY_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_DROP_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_STREAM_CHECKPOINT_TIMER
,
mndProcessStreamCheckpointTmr
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_STREAM_BEGIN_CHECKPOINT
,
mndProcessStreamDoCheckpoint
);
//
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
//
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle
(
pMnode
,
TDMT_STREAM_TASK_REPORT_CHECKPOINT
,
mndTransProcessRsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_STREAMS
,
mndRetrieveStream
);
...
...
@@ -778,6 +778,9 @@ _OVER:
tFreeStreamObj
(
&
streamObj
);
return
code
;
}
#if 0
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
...
...
@@ -942,6 +945,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
return 0;
}
#endif
static
int32_t
mndProcessDropStreamReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SStreamObj
*
pStream
=
NULL
;
...
...
source/libs/stream/src/streamCheckpoint.c
浏览文件 @
1d58f85a
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "streamInc.h"
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
...
...
@@ -192,3 +193,4 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre
// set status normal
return 0;
}
#endif
source/libs/stream/src/streamData.c
浏览文件 @
1d58f85a
...
...
@@ -26,7 +26,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
ASSERT
(
pReq
->
blockNum
==
taosArrayGetSize
(
pReq
->
dataLen
));
for
(
int32_t
i
=
0
;
i
<
blockNum
;
i
++
)
{
SRetrieveTableRsp
*
pRetrieve
=
taosArrayGetP
(
pReq
->
data
,
i
);
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
pReq
->
data
,
i
);
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pArray
,
i
);
blockDecode
(
pDataBlock
,
pRetrieve
->
data
);
// TODO: refactor
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
1d58f85a
...
...
@@ -114,12 +114,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t
batchCnt
=
0
;
while
(
1
)
{
if
(
atomic_load_8
(
&
pTask
->
taskStatus
)
==
TASK_STATUS__DROPPING
)
{
taosArrayDestroy
(
pRes
);
return
0
;
}
SSDataBlock
*
output
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
taosArrayDestroy
(
pRes
);
return
-
1
;
}
if
(
output
==
NULL
)
{
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
1d58f85a
...
...
@@ -171,6 +171,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
}
#endif
#if 0
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
...
...
@@ -180,6 +181,7 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
return NULL;
}
}
#endif
SStreamTask
*
streamMetaAcquireTask
(
SStreamMeta
*
pMeta
,
int32_t
taskId
)
{
taosRLockLatch
(
&
pMeta
->
lock
);
...
...
source/libs/stream/src/streamQueue.c
浏览文件 @
1d58f85a
...
...
@@ -46,6 +46,7 @@ void streamQueueClose(SStreamQueue* queue) {
taosMemoryFree
(
queue
);
}
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
//
return true;
...
...
@@ -101,3 +102,4 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
if (pNode) return streamQueueBuildRes(pNode);
return (SStreamQueueRes){0};
}
#endif
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录