Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dfb2b067
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dfb2b067
编写于
9月 30, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: coverity issues
上级
c6317e67
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
51 addition
and
40 deletion
+51
-40
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+17
-3
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+11
-11
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+17
-20
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+6
-6
未找到文件。
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
dfb2b067
...
...
@@ -42,9 +42,23 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI64
(
pEncoder
,
pObj
->
targetStbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
pObj
->
sql
!=
NULL
)
{
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sql
)
<
0
)
return
-
1
;
}
else
{
if
(
tEncodeCStr
(
pEncoder
,
""
)
<
0
)
return
-
1
;
}
if
(
pObj
->
ast
!=
NULL
)
{
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
ast
)
<
0
)
return
-
1
;
}
else
{
if
(
tEncodeCStr
(
pEncoder
,
""
)
<
0
)
return
-
1
;
}
if
(
pObj
->
physicalPlan
!=
NULL
)
{
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
}
else
{
if
(
tEncodeCStr
(
pEncoder
,
""
)
<
0
)
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pObj
->
tasks
);
if
(
tEncodeI32
(
pEncoder
,
sz
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
dfb2b067
...
...
@@ -554,7 +554,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
for
(
int32_t
i
=
0
;
i
<
pCreate
->
numOfColumns
;
++
i
)
{
SField
*
pField1
=
taosArrayGet
(
pCreate
->
pColumns
,
i
);
if
(
pField1
->
type
<
0
)
{
if
(
pField1
->
type
>=
TSDB_DATA_TYPE_MAX
)
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
...
...
@@ -570,7 +570,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
for
(
int32_t
i
=
0
;
i
<
pCreate
->
numOfTags
;
++
i
)
{
SField
*
pField1
=
taosArrayGet
(
pCreate
->
pTags
,
i
);
if
(
pField1
->
type
<
0
)
{
if
(
pField1
->
type
>=
TSDB_DATA_TYPE_MAX
)
{
terrno
=
TSDB_CODE_MND_INVALID_STB_OPTION
;
return
-
1
;
}
...
...
@@ -982,8 +982,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto
_OVER
;
}
}
else
{
mError
(
"stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid
"
,
createReq
.
name
,
createReq
.
tagVer
,
createReq
.
colVer
,
pStb
->
tagVer
,
pStb
->
colVer
);
mError
(
"stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid
, origin tagVer:%d colVer:%d"
,
createReq
.
name
,
createReq
.
tagVer
,
createReq
.
colVer
,
pStb
->
tagVer
,
pStb
->
colVer
);
terrno
=
TSDB_CODE_MND_INVALID_SCHEMA_VER
;
goto
_OVER
;
}
...
...
@@ -1603,9 +1603,9 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
return
-
1
;
}
strcpy
(
pRsp
->
dbFName
,
pStb
->
db
);
strcpy
(
pRsp
->
tbName
,
tbName
);
strcpy
(
pRsp
->
stbName
,
tbName
);
tstrncpy
(
pRsp
->
dbFName
,
pStb
->
db
,
sizeof
(
pRsp
->
dbFName
)
);
tstrncpy
(
pRsp
->
tbName
,
tbName
,
sizeof
(
pRsp
->
tbName
)
);
tstrncpy
(
pRsp
->
stbName
,
tbName
,
sizeof
(
pRsp
->
stbName
)
);
pRsp
->
dbId
=
pDb
->
uid
;
pRsp
->
numOfTags
=
pStb
->
numOfTags
;
pRsp
->
numOfColumns
=
pStb
->
numOfColumns
;
...
...
@@ -1649,9 +1649,9 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
return
-
1
;
}
strcpy
(
pRsp
->
dbFName
,
pStb
->
db
);
strcpy
(
pRsp
->
tbName
,
tbName
);
strcpy
(
pRsp
->
stbName
,
tbName
);
tstrncpy
(
pRsp
->
dbFName
,
pStb
->
db
,
sizeof
(
pRsp
->
dbFName
)
);
tstrncpy
(
pRsp
->
tbName
,
tbName
,
sizeof
(
pRsp
->
tbName
)
);
tstrncpy
(
pRsp
->
stbName
,
tbName
,
sizeof
(
pRsp
->
stbName
)
);
pRsp
->
numOfTags
=
pStb
->
numOfTags
;
pRsp
->
numOfColumns
=
pStb
->
numOfColumns
;
pRsp
->
tableType
=
TSDB_SUPER_TABLE
;
...
...
@@ -2551,7 +2551,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
maxDelay
,
false
);
char
rollup
[
1
28
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
rollup
[
1
60
+
VARSTR_HEADER_SIZE
]
=
{
0
};
int32_t
rollupNum
=
(
int32_t
)
taosArrayGetSize
(
pStb
->
pFuncs
);
for
(
int32_t
i
=
0
;
i
<
rollupNum
;
++
i
)
{
char
*
funcName
=
taosArrayGet
(
pStb
->
pFuncs
,
i
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
dfb2b067
...
...
@@ -425,8 +425,10 @@ static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStream
SStreamObj
streamObj
=
{
0
};
memcpy
(
streamObj
.
name
,
pStream
->
name
,
TSDB_STREAM_FNAME_LEN
);
streamObj
.
status
=
STREAM_STATUS__RECOVER
;
SSdbRaw
*
pCommitRaw
=
mndStreamActionEncode
(
&
streamObj
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
mError
(
"stream trans:%d, failed to append commit log since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -771,12 +773,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if
(
mndDropStreamTasks
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
mError
(
"stream:%s, failed to drop task since %s"
,
dropReq
.
name
,
terrstr
());
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
// drop stream
if
(
mndPersistDropStreamLog
(
pMnode
,
pTrans
,
pStream
)
<
0
)
{
sdbRelease
(
pMnode
->
pSdb
,
pStream
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
...
...
@@ -945,10 +949,8 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SName
n
;
int32_t
cols
=
0
;
char
streamName
[
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tNameFromString
(
&
n
,
pStream
->
name
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameGetDbName
(
&
n
,
varDataVal
(
streamName
));
varDataSetLen
(
streamName
,
strlen
(
varDataVal
(
streamName
)));
char
streamName
[
TSDB_DB_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
streamName
,
mndGetDbStr
(
pStream
->
name
),
sizeof
(
streamName
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
streamName
,
false
);
...
...
@@ -956,28 +958,24 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
createTime
,
false
);
char
sql
[
TSDB_SHOW_SQL_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
&
sql
[
VARSTR_HEADER_SIZE
],
pStream
->
sql
,
TSDB_SHOW_SQL_LEN
);
varDataSetLen
(
sql
,
strlen
(
&
sql
[
VARSTR_HEADER_SIZE
]));
STR_WITH_MAXSIZE_TO_VARSTR
(
streamName
,
pStream
->
sql
,
sizeof
(
sql
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
sql
,
false
);
char
status
[
20
+
VARSTR_HEADER_SIZE
]
=
{
0
};
mndShowStreamStatus
(
&
status
[
VARSTR_HEADER_SIZE
],
pStream
);
varDataSetLen
(
status
,
strlen
(
varDataVal
(
status
)));
char
status2
[
20
]
=
{
0
};
mndShowStreamStatus
(
status2
,
pStream
);
STR_WITH_MAXSIZE_TO_VARSTR
(
status
,
status2
,
sizeof
(
status
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
status
,
false
);
char
sourceDB
[
TSDB_DB_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tNameFromString
(
&
n
,
pStream
->
sourceDb
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameGetDbName
(
&
n
,
varDataVal
(
sourceDB
));
varDataSetLen
(
sourceDB
,
strlen
(
varDataVal
(
sourceDB
)));
STR_WITH_MAXSIZE_TO_VARSTR
(
sourceDB
,
mndGetDbStr
(
pStream
->
sourceDb
),
sizeof
(
sourceDB
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
sourceDB
,
false
);
char
targetDB
[
TSDB_DB_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tNameFromString
(
&
n
,
pStream
->
targetDb
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameGetDbName
(
&
n
,
varDataVal
(
targetDB
));
varDataSetLen
(
targetDB
,
strlen
(
varDataVal
(
targetDB
)));
STR_WITH_MAXSIZE_TO_VARSTR
(
targetDB
,
mndGetDbStr
(
pStream
->
targetDb
),
sizeof
(
targetDB
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
targetDB
,
false
);
...
...
@@ -986,9 +984,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend
(
pColInfo
,
numOfRows
,
NULL
,
true
);
}
else
{
char
targetSTB
[
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tNameFromString
(
&
n
,
pStream
->
targetSTbName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
strcpy
(
&
targetSTB
[
VARSTR_HEADER_SIZE
],
tNameGetTableName
(
&
n
));
varDataSetLen
(
targetSTB
,
strlen
(
varDataVal
(
targetSTB
)));
STR_WITH_MAXSIZE_TO_VARSTR
(
targetSTB
,
mndGetStbStr
(
pStream
->
targetSTbName
),
sizeof
(
targetSTB
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
targetSTB
,
false
);
}
...
...
@@ -997,8 +993,9 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
watermark
,
false
);
char
trigger
[
20
+
VARSTR_HEADER_SIZE
]
=
{
0
};
mndShowStreamTrigger
(
&
trigger
[
VARSTR_HEADER_SIZE
],
pStream
);
varDataSetLen
(
trigger
,
strlen
(
varDataVal
(
trigger
)));
char
trigger2
[
20
]
=
{
0
};
mndShowStreamTrigger
(
trigger2
,
pStream
);
STR_WITH_MAXSIZE_TO_VARSTR
(
trigger
,
trigger2
,
sizeof
(
trigger
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
trigger
,
false
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
dfb2b067
...
...
@@ -328,14 +328,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
if
(
!
syncEnvIsStart
())
{
vGError
(
"vgId:%d, msg:%p failed to process since sync env not start"
,
pVnode
->
config
.
vgId
);
vGError
(
"vgId:%d, msg:%p failed to process since sync env not start"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_APP_ERROR
;
return
-
1
;
}
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
if
(
pSyncNode
==
NULL
)
{
vGError
(
"vgId:%d, msg:%p failed to process since invalid sync node"
,
pVnode
->
config
.
vgId
);
vGError
(
"vgId:%d, msg:%p failed to process since invalid sync node"
,
pVnode
->
config
.
vgId
,
pMsg
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
...
...
@@ -394,7 +394,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg
rsp
=
{.
code
=
code
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
else
{
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
->
msgType
);
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
,
pMsg
->
msgType
);
code
=
-
1
;
}
...
...
@@ -459,7 +459,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg
rsp
=
{.
code
=
code
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
else
{
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
->
msgType
);
vGError
(
"vgId:%d, msg:%p failed to process since error msg type:%d"
,
pVnode
->
config
.
vgId
,
pMsg
,
pMsg
->
msgType
);
code
=
-
1
;
}
}
...
...
@@ -630,7 +630,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
vInfo
(
"vgId:%d, start write vnode snapshot since apply queue is empty"
,
pVnode
->
config
.
vgId
);
break
;
}
else
{
vInfo
(
"vgId:%d, write vnode snapshot later since %d items in apply queue"
,
pVnode
->
config
.
vgId
);
vInfo
(
"vgId:%d, write vnode snapshot later since %d items in apply queue"
,
pVnode
->
config
.
vgId
,
itemSize
);
taosMsleep
(
10
);
}
}
while
(
true
);
...
...
@@ -683,7 +683,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
vInfo
(
"vgId:%d, apply queue is empty, restore finish"
,
pVnode
->
config
.
vgId
);
break
;
}
else
{
vInfo
(
"vgId:%d, restore not finish since %d items in apply queue"
,
pVnode
->
config
.
vgId
);
vInfo
(
"vgId:%d, restore not finish since %d items in apply queue"
,
pVnode
->
config
.
vgId
,
itemSize
);
taosMsleep
(
10
);
}
}
while
(
true
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录