Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b1594a68
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b1594a68
编写于
10月 10, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix coverity scan
上级
86d80ad4
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
57 addition
and
23 deletion
+57
-23
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+3
-0
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+5
-2
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+8
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+8
-6
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+3
-1
source/dnode/vnode/src/tq/tqOffsetSnapshot.c
source/dnode/vnode/src/tq/tqOffsetSnapshot.c
+8
-3
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+3
-0
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+1
-0
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+4
-4
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+8
-1
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+1
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+1
-2
source/libs/wal/src/walRef.c
source/libs/wal/src/walRef.c
+3
-1
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+1
-0
未找到文件。
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
b1594a68
...
...
@@ -149,10 +149,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
tDeleteSMqConsumerObj
(
pConsumerNew
);
taosMemoryFree
(
pConsumerNew
);
mndTransDrop
(
pTrans
);
return
0
;
FAIL:
tDeleteSMqConsumerObj
(
pConsumerNew
);
taosMemoryFree
(
pConsumerNew
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
b1594a68
...
...
@@ -118,7 +118,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
return
-
1
;
if
(
tDecodeSStreamTask
(
pDecoder
,
pTask
)
<
0
)
return
-
1
;
if
(
tDecodeSStreamTask
(
pDecoder
,
pTask
)
<
0
)
{
taosMemoryFree
(
pTask
);
return
-
1
;
}
taosArrayPush
(
pArray
,
&
pTask
);
}
taosArrayPush
(
pObj
->
tasks
,
&
pArray
);
...
...
@@ -353,7 +356,7 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
}
void
tDeleteSMqConsumerEp
(
void
*
data
)
{
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
data
;
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
data
;
taosArrayDestroyP
(
pConsumerEp
->
vgs
,
(
FDelete
)
tDeleteSMqVgEp
);
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
b1594a68
...
...
@@ -678,25 +678,30 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
if
(
pTrans
==
NULL
)
{
mError
(
"cgroup: %s on topic:%s, failed to drop since %s"
,
dropReq
.
cgroup
,
dropReq
.
topic
,
terrstr
());
mndReleaseSubscribe
(
pMnode
,
pSub
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
mInfo
(
"trans:%d, used to drop cgroup:%s on topic %s"
,
pTrans
->
id
,
dropReq
.
cgroup
,
dropReq
.
topic
);
if
(
mndDropOffsetBySubKey
(
pMnode
,
pTrans
,
pSub
->
key
)
<
0
)
{
ASSERT
(
0
);
mndReleaseSubscribe
(
pMnode
,
pSub
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
if
(
mndSetDropSubCommitLogs
(
pMnode
,
pTrans
,
pSub
)
<
0
)
{
mError
(
"cgroup %s on topic:%s, failed to drop since %s"
,
dropReq
.
cgroup
,
dropReq
.
topic
,
terrstr
());
mndReleaseSubscribe
(
pMnode
,
pSub
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndTransPrepare
(
pMnode
,
pTrans
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
<
0
)
{
mndReleaseSubscribe
(
pMnode
,
pSub
);
mndTransDrop
(
pTrans
);
return
-
1
;
}
mndReleaseSubscribe
(
pMnode
,
pSub
);
return
TSDB_CODE_ACTION_IN_PROGRESS
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
b1594a68
...
...
@@ -93,7 +93,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT
(
0
);
}
if
(
tqOffsetOpen
(
pTq
)
<
0
)
{
pTq
->
pOffsetStore
=
tqOffsetOpen
(
pTq
);
if
(
pTq
->
pOffsetStore
==
NULL
)
{
ASSERT
(
0
);
}
...
...
@@ -648,7 +649,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code
=
-
1
;
}
tDeleteSTaosxRsp
(
&
taosxRsp
);
if
(
pCkHead
)
taosMemoryFree
(
pCkHead
);
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
...
...
@@ -671,7 +672,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code
=
-
1
;
}
tDeleteSTaosxRsp
(
&
taosxRsp
);
if
(
pCkHead
)
taosMemoryFree
(
pCkHead
);
taosMemoryFreeClear
(
pCkHead
);
return
code
;
}
else
{
fetchVer
++
;
...
...
@@ -687,18 +688,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
metaRsp
.
metaRsp
=
pHead
->
body
;
if
(
tqSendMetaPollRsp
(
pTq
,
pMsg
,
pReq
,
&
metaRsp
)
<
0
)
{
code
=
-
1
;
taosMemoryFree
(
pCkHead
);
taosMemoryFree
Clear
(
pCkHead
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
code
;
}
code
=
0
;
if
(
pCkHead
)
taosMemoryFree
(
pCkHead
);
taosMemoryFreeClear
(
pCkHead
);
tDeleteSTaosxRsp
(
&
taosxRsp
);
return
code
;
}
}
}
tDeleteSTaosxRsp
(
&
taosxRsp
);
taosMemoryFreeClear
(
pCkHead
);
return
0
;
}
...
...
@@ -767,7 +769,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
STqHandle
*
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
));
if
(
pHandle
==
NULL
)
{
if
(
req
.
oldConsumerId
!=
-
1
)
{
tqError
(
"vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld"
,
req
.
vgId
,
req
.
subKey
,
tqError
(
"vgId:%d, build new consumer handle %s for consumer %
l
d, but old consumerId is %ld"
,
req
.
vgId
,
req
.
subKey
,
req
.
newConsumerId
,
req
.
oldConsumerId
);
}
if
(
req
.
newConsumerId
==
-
1
)
{
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
b1594a68
...
...
@@ -170,11 +170,13 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
if
(
tDecodeSTqCheckInfo
(
&
decoder
,
&
info
)
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tdbTbcClose
(
pCur
);
return
-
1
;
}
tDecoderClear
(
&
decoder
);
if
(
taosHashPut
(
pTq
->
pCheckInfo
,
info
.
topic
,
strlen
(
info
.
topic
),
&
info
,
sizeof
(
STqCheckInfo
))
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tdbTbcClose
(
pCur
);
return
-
1
;
}
}
...
...
@@ -188,7 +190,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
tEncodeSize
(
tEncodeSTqHandle
,
pHandle
,
vlen
,
code
);
ASSERT
(
code
==
0
);
tqDebug
(
"tq save %s(%d) consumer %
"
PRId64
" vgId:%d"
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
),
pHandle
->
consumerId
,
tqDebug
(
"tq save %s(%d) consumer %
ld vgId:%d"
,
pHandle
->
subKey
,
(
int32_t
)
strlen
(
pHandle
->
subKey
),
pHandle
->
consumerId
,
TD_VID
(
pTq
->
pVnode
));
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
...
...
source/dnode/vnode/src/tq/tqOffsetSnapshot.c
浏览文件 @
b1594a68
...
...
@@ -54,8 +54,8 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
char
*
fname
=
tqOffsetBuildFName
(
pReader
->
pTq
->
path
,
0
);
TdFilePtr
pFile
=
taosOpenFile
(
fname
,
TD_FILE_READ
);
taosMemoryFree
(
fname
);
if
(
pFile
!=
NULL
)
{
taosMemoryFree
(
fname
);
return
0
;
}
...
...
@@ -63,6 +63,7 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
if
(
taosStatFile
(
fname
,
&
sz
,
NULL
)
<
0
)
{
ASSERT
(
0
);
}
taosMemoryFree
(
fname
);
SSnapDataHdr
*
buf
=
taosMemoryCalloc
(
1
,
sz
+
sizeof
(
SSnapDataHdr
));
if
(
buf
==
NULL
)
{
...
...
@@ -120,9 +121,13 @@ int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) {
char
*
fname
=
tqOffsetBuildFName
(
pTq
->
path
,
0
);
if
(
rollback
)
{
taosRemoveFile
(
pWriter
->
fname
);
if
(
taosRemoveFile
(
pWriter
->
fname
)
<
0
)
{
ASSERT
(
0
);
}
}
else
{
taosRenameFile
(
pWriter
->
fname
,
fname
);
if
(
taosRenameFile
(
pWriter
->
fname
,
fname
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tqOffsetRestoreFromFile
(
pTq
->
pOffsetStore
,
fname
)
<
0
)
{
ASSERT
(
0
);
}
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
b1594a68
...
...
@@ -226,6 +226,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if
(
data
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tqError
(
"failed to copy data for stream since out of memory"
);
taosArrayDestroyP
(
cachedKeys
,
(
FDelete
)
taosMemoryFree
);
taosArrayDestroy
(
cachedKeyLens
);
return
-
1
;
}
memcpy
(
data
,
msg
,
msgLen
);
...
...
@@ -299,6 +301,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
}
taosArrayDestroyP
(
cachedKeys
,
(
FDelete
)
taosMemoryFree
);
taosArrayDestroy
(
cachedKeyLens
);
taosMemoryFree
(
data
);
}
// unlock
taosWUnLockLatch
(
&
pTq
->
pushLock
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
b1594a68
...
...
@@ -253,6 +253,7 @@ STqReader* tqOpenReader(SVnode* pVnode) {
pReader
->
pWalReader
=
walOpenReader
(
pVnode
->
pWal
,
NULL
);
if
(
pReader
->
pWalReader
==
NULL
)
{
taosMemoryFree
(
pReader
);
return
NULL
;
}
...
...
source/libs/wal/inc/walInt.h
浏览文件 @
b1594a68
...
...
@@ -97,12 +97,12 @@ static inline SWalFileInfo* walGetCurFileInfo(SWal* pWal) {
return
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
}
static
inline
int
walBuildLogName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/%020"
PRId64
"."
WAL_LOG_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
static
inline
void
walBuildLogName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
sprintf
(
buf
,
"%s/%020"
PRId64
"."
WAL_LOG_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
}
static
inline
int
walBuildIdxName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/%020"
PRId64
"."
WAL_INDEX_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
static
inline
void
walBuildIdxName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
sprintf
(
buf
,
"%s/%020"
PRId64
"."
WAL_INDEX_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
}
static
inline
int
walValidHeadCksum
(
SWalCkHead
*
pHead
)
{
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
b1594a68
...
...
@@ -157,6 +157,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
TdDirPtr
pDir
=
taosOpenDir
(
pWal
->
path
);
if
(
pDir
==
NULL
)
{
regfree
(
&
logRegPattern
);
regfree
(
&
idxRegPattern
);
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
...
...
@@ -304,7 +306,12 @@ int walCheckAndRepairIdx(SWal* pWal) {
return
-
1
;
}
while
(
idxEntry
.
ver
<
pFileInfo
->
lastVer
)
{
taosLSeekFile
(
pLogFile
,
idxEntry
.
offset
,
SEEK_SET
);
if
(
taosLSeekFile
(
pLogFile
,
idxEntry
.
offset
,
SEEK_SET
)
==
-
1
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, cannot seek file %s at %ld, since %s"
,
pWal
->
cfg
.
vgId
,
fLogNameStr
,
idxEntry
.
offset
,
terrstr
());
return
-
1
;
}
SWalCkHead
ckHead
;
taosReadFile
(
pLogFile
,
&
ckHead
,
sizeof
(
SWalCkHead
));
if
(
idxEntry
.
ver
!=
ckHead
.
head
.
version
)
{
...
...
source/libs/wal/src/walMgmt.c
浏览文件 @
b1594a68
...
...
@@ -98,6 +98,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
pWal
->
path
));
if
(
taosMkDir
(
pWal
->
path
)
!=
0
)
{
wError
(
"vgId:%d, path:%s, failed to create directory since %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
taosMemoryFree
(
pWal
);
return
NULL
;
}
...
...
source/libs/wal/src/walRead.c
浏览文件 @
b1594a68
...
...
@@ -494,7 +494,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
taosThreadMutexUnlock
(
&
pReader
->
mutex
);
return
-
1
;
}
pReader
->
pHead
=
ptr
;
pReader
->
pHead
=
(
SWalCkHead
*
)
ptr
;
pReader
->
capacity
=
pReader
->
pHead
->
head
.
bodyLen
;
}
...
...
@@ -504,7 +504,6 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
}
taosThreadMutexUnlock
(
&
pReader
->
mutex
);
return
-
1
;
...
...
source/libs/wal/src/walRef.c
浏览文件 @
b1594a68
...
...
@@ -33,7 +33,9 @@ SWalRef *walOpenRef(SWal *pWal) {
}
void
walCloseRef
(
SWal
*
pWal
,
int64_t
refId
)
{
SWalRef
*
pRef
=
*
(
SWalRef
**
)
taosHashGet
(
pWal
->
pRefHash
,
&
refId
,
sizeof
(
int64_t
));
SWalRef
**
ppRef
=
taosHashGet
(
pWal
->
pRefHash
,
&
refId
,
sizeof
(
int64_t
));
if
(
ppRef
==
NULL
)
return
;
SWalRef
*
pRef
=
*
ppRef
;
taosHashRemove
(
pWal
->
pRefHash
,
&
refId
,
sizeof
(
int64_t
));
taosMemoryFree
(
pRef
);
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
b1594a68
...
...
@@ -28,6 +28,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
SWalRef
*
pRef
=
(
SWalRef
*
)
pIter
;
if
(
pRef
->
refVer
!=
-
1
&&
pRef
->
refVer
<=
ver
)
{
taosHashCancelIterate
(
pWal
->
pRefHash
,
pIter
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录