Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c2616597
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
c2616597
编写于
7月 30, 2023
作者:
W
wade zhang
提交者:
GitHub
7月 30, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22227 from taosdata/fix/TS-3728
fix:add macro ASSERT_NOT_CORE to control ASSERT not core in release version
上级
80ad5a0a
bf74f9e2
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
72 addition
and
35 deletion
+72
-35
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+5
-5
source/common/src/tmsg.c
source/common/src/tmsg.c
+3
-0
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+6
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+38
-21
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+3
-3
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+1
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/util/CMakeLists.txt
source/util/CMakeLists.txt
+7
-0
source/util/src/tlog.c
source/util/src/tlog.c
+4
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-1
tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
...st/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
+2
-2
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
c2616597
...
...
@@ -1859,8 +1859,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
static
void
updateVgInfo
(
SMqClientVg
*
pVg
,
STqOffsetVal
*
reqOffset
,
STqOffsetVal
*
rspOffset
,
int64_t
sver
,
int64_t
ever
,
int64_t
consumerId
){
if
(
!
pVg
->
seekUpdated
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is update, since seekupdate not set"
,
consumerId
);
if
(
reqOffset
->
type
!=
0
)
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
if
(
rspOffset
->
type
!=
0
)
pVg
->
offsetInfo
.
endOffset
=
*
rspOffset
;
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
pVg
->
offsetInfo
.
endOffset
=
*
rspOffset
;
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is NOT update, since seekupdate is set"
,
consumerId
);
}
...
...
@@ -1948,7 +1948,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pDataRsp
->
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
metaRsp
.
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -2036,7 +2036,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
taosxRsp
.
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
source/common/src/tmsg.c
浏览文件 @
c2616597
...
...
@@ -7227,6 +7227,9 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
return
pLeft
->
uid
==
pRight
->
uid
&&
pLeft
->
ts
==
pRight
->
ts
;
}
else
if
(
pLeft
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
return
pLeft
->
uid
==
pRight
->
uid
;
}
else
{
uError
(
"offset type:%d"
,
pLeft
->
type
);
ASSERT
(
0
);
}
}
return
false
;
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
c2616597
...
...
@@ -420,6 +420,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
data
->
topicName
);
if
(
pSub
==
NULL
){
ASSERT
(
0
);
continue
;
}
taosWLockLatch
(
&
pSub
->
lock
);
...
...
@@ -515,7 +516,10 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
char
*
topic
=
taosArrayGetP
(
pConsumer
->
currentTopics
,
i
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
topic
);
// txn guarantees pSub is created
if
(
pSub
==
NULL
)
continue
;
if
(
pSub
==
NULL
)
{
ASSERT
(
0
);
continue
;
}
taosRLockLatch
(
&
pSub
->
lock
);
SMqSubTopicEp
topicEp
=
{
0
};
...
...
@@ -524,6 +528,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
if
(
pTopic
==
NULL
)
{
ASSERT
(
0
);
taosRUnLockLatch
(
&
pSub
->
lock
);
mndReleaseSubscribe
(
pMnode
,
pSub
);
continue
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
c2616597
...
...
@@ -1168,7 +1168,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
consumerIdHex
,
consumerId
==
-
1
);
m
Debug
(
"mnd show subscriptions: topic %s, consumer:0x%"
PRIx64
" cgroup %s vgid %d"
,
varDataVal
(
topic
),
m
Info
(
"mnd show subscriptions: topic %s, consumer:0x%"
PRIx64
" cgroup %s vgid %d"
,
varDataVal
(
topic
),
consumerId
,
varDataVal
(
cgroup
),
pVgEp
->
vgId
);
// offset
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
c2616597
...
...
@@ -146,6 +146,20 @@ void tqClose(STQ* pTq) {
return
;
}
void
*
pIter
=
taosHashIterate
(
pTq
->
pPushMgr
,
NULL
);
while
(
pIter
)
{
STqHandle
*
pHandle
=
*
(
STqHandle
**
)
pIter
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
if
(
pHandle
->
msg
!=
NULL
)
{
tqPushEmptyDataRsp
(
pHandle
,
vgId
);
rpcFreeCont
(
pHandle
->
msg
->
pCont
);
taosMemoryFree
(
pHandle
->
msg
);
pHandle
->
msg
=
NULL
;
}
pIter
=
taosHashIterate
(
pTq
->
pPushMgr
,
pIter
);
}
tqOffsetClose
(
pTq
->
pOffsetStore
);
taosHashCleanup
(
pTq
->
pHandle
);
taosHashCleanup
(
pTq
->
pPushMgr
);
...
...
@@ -278,6 +292,10 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tqInitDataRsp
(
&
dataRsp
,
&
req
);
dataRsp
.
blockNum
=
0
;
dataRsp
.
rspOffset
=
dataRsp
.
reqOffset
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
dataRsp
.
reqOffset
);
tqInfo
(
"tqPushEmptyDataRsp to consumer:0x%"
PRIx64
" vgId:%d, offset:%s, reqId:0x%"
PRIx64
,
req
.
consumerId
,
vgId
,
buf
,
req
.
reqId
);
tqSendDataRsp
(
pHandle
,
pHandle
->
msg
,
&
req
,
&
dataRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
tDeleteMqDataRsp
(
&
dataRsp
);
return
0
;
...
...
@@ -515,10 +533,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
while
(
pIter
)
{
STqHandle
*
pHandle
=
*
(
STqHandle
**
)
pIter
;
tq
Debug
(
"vgId:%d start set submit for pHandle:%p, consumer:0x%"
PRIx64
,
vgId
,
pHandle
,
pHandle
->
consumerId
);
tq
Info
(
"vgId:%d start set submit for pHandle:%p, consumer:0x%"
PRIx64
,
vgId
,
pHandle
,
pHandle
->
consumerId
);
if
(
ASSERT
(
pHandle
->
msg
!=
NULL
))
{
tqError
(
"pHandle->msg should not be null"
);
taosHashCancelIterate
(
pTq
->
pPushMgr
,
pIter
);
break
;
}
else
{
SRpcMsg
msg
=
{.
msgType
=
TDMT_VND_TMQ_CONSUME
,
.
pCont
=
pHandle
->
msg
->
pCont
,
.
contLen
=
pHandle
->
msg
->
contLen
,
.
info
=
pHandle
->
msg
->
info
};
...
...
@@ -849,30 +868,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosWLockLatch
(
&
pTq
->
lock
);
if
(
pHandle
->
consumerId
==
req
.
newConsumerId
)
{
// do nothing
tqInfo
(
"vgId:%d consumer:0x%"
PRIx64
" remains, no switch occurs, should not reach here"
,
req
.
vgId
,
req
.
newConsumerId
);
tqInfo
(
"vgId:%d no switch consumer:0x%"
PRIx64
" remains, because redo wal log"
,
req
.
vgId
,
req
.
newConsumerId
);
}
else
{
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
atomic_store_64
(
&
pHandle
->
consumerId
,
req
.
newConsumerId
);
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
}
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
taosWUnLockLatch
(
&
pTq
->
lock
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
}
end:
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
c2616597
...
...
@@ -78,12 +78,12 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy
(
pHandle
->
msg
->
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
pHandle
->
msg
->
contLen
=
pMsg
->
contLen
;
int32_t
ret
=
taosHashPut
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
),
&
pHandle
,
POINTER_BYTES
);
tq
Debug
(
"vgId:%d data is over, ret:%d, consumerId:0x%"
PRIx64
", register to pHandle:%p, pCont:%p, len:%d"
,
vgId
,
ret
,
tq
Info
(
"vgId:%d data is over, ret:%d, consumerId:0x%"
PRIx64
", register to pHandle:%p, pCont:%p, len:%d"
,
vgId
,
ret
,
pHandle
->
consumerId
,
pHandle
,
pHandle
->
msg
->
pCont
,
pHandle
->
msg
->
contLen
);
return
0
;
}
int
32_t
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
handle
)
{
int
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
handle
)
{
STqHandle
*
pHandle
=
(
STqHandle
*
)
handle
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
...
...
@@ -91,7 +91,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
return
0
;
}
int32_t
ret
=
taosHashRemove
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
));
tq
Debug
(
"vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%"
PRIx64
,
vgId
,
pHandle
,
ret
,
pHandle
->
consumerId
);
tq
Info
(
"vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%"
PRIx64
,
vgId
,
pHandle
,
ret
,
pHandle
->
consumerId
);
if
(
pHandle
->
msg
!=
NULL
)
{
// tqPushDataRsp(pHandle, vgId);
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
c2616597
...
...
@@ -317,6 +317,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// the offset value can not be monotonious increase??
offset
=
reqOffset
;
}
else
{
uError
(
"req offset type is 0"
);
return
TSDB_CODE_TMQ_INVALID_MSG
;
}
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
c2616597
...
...
@@ -476,8 +476,8 @@ void vnodeClose(SVnode *pVnode) {
tsem_wait
(
&
pVnode
->
canCommit
);
vnodeSyncClose
(
pVnode
);
vnodeQueryClose
(
pVnode
);
walClose
(
pVnode
->
pWal
);
tqClose
(
pVnode
->
pTq
);
walClose
(
pVnode
->
pWal
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
smaClose
(
pVnode
->
pSma
);
if
(
pVnode
->
pMeta
)
metaClose
(
&
pVnode
->
pMeta
);
...
...
source/util/CMakeLists.txt
浏览文件 @
c2616597
...
...
@@ -5,6 +5,13 @@ if (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions
(
-DGRANTS_CFG
)
endif
()
IF
(
${
ASSERT_NOT_CORE
}
)
ADD_DEFINITIONS
(
-DASSERT_NOT_CORE
)
MESSAGE
(
STATUS
"disable assert core"
)
ELSE
()
MESSAGE
(
STATUS
"enable assert core"
)
ENDIF
(
${
ASSERT_NOT_CORE
}
)
target_include_directories
(
util
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/util"
...
...
source/util/src/tlog.c
浏览文件 @
c2616597
...
...
@@ -76,7 +76,11 @@ static int32_t tsDaylightActive; /* Currently in daylight saving time. */
bool
tsLogEmbedded
=
0
;
bool
tsAsyncLog
=
true
;
#ifdef ASSERT_NOT_CORE
bool
tsAssert
=
false
;
#else
bool
tsAssert
=
true
;
#endif
int32_t
tsNumOfLogLines
=
10000000
;
int32_t
tsLogKeepDays
=
0
;
LogFp
tsLogFp
=
NULL
;
...
...
tests/parallel_test/cases.task
浏览文件 @
c2616597
...
...
@@ -105,7 +105,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
#
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
...
...
tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
浏览文件 @
c2616597
...
...
@@ -222,9 +222,9 @@ class TDTestCase:
actConsumeTotalRows
=
resultList
[
0
]
if
not
(
actConsumeTotalRows
>
0
and
actConsumeTotalRows
<
totalRowsInserted
):
if
not
(
actConsumeTotalRows
>
=
0
and
actConsumeTotalRows
<=
totalRowsInserted
):
tdLog
.
info
(
"act consume rows: %d"
%
(
actConsumeTotalRows
))
tdLog
.
info
(
"and second consume rows should be between
0 and %d
"
%
(
totalRowsInserted
))
tdLog
.
info
(
"and second consume rows should be between
[0 and %d]
"
%
(
totalRowsInserted
))
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
time
.
sleep
(
10
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录