Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
47ae92af
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
47ae92af
编写于
7月 06, 2023
作者:
wmmhello
浏览文件
操作
浏览文件
下载
差异文件
fix:conflicts from main
上级
11e6856d
db63b436
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
1081 addition
and
187 deletion
+1081
-187
Jenkinsfile2
Jenkinsfile2
+1
-1
docs/en/12-taos-sql/02-database.md
docs/en/12-taos-sql/02-database.md
+1
-1
docs/examples/python/tmq_assignment_example.py
docs/examples/python/tmq_assignment_example.py
+1
-1
docs/zh/12-taos-sql/02-database.md
docs/zh/12-taos-sql/02-database.md
+1
-1
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+100
-62
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+3
-0
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+6
-1
source/dnode/vnode/src/vnd/vnodeRetention.c
source/dnode/vnode/src/vnd/vnodeRetention.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+7
-4
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+33
-74
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+8
-6
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+12
-5
source/util/src/tlog.c
source/util/src/tlog.c
+0
-13
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+4
-0
tests/parallel_test/run_case.sh
tests/parallel_test/run_case.sh
+2
-2
tests/system-test/2-query/diff.py
tests/system-test/2-query/diff.py
+35
-1
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+21
-1
tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py
tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py
+248
-0
tests/system-test/7-tmq/tmqDropConsumer.json
tests/system-test/7-tmq/tmqDropConsumer.json
+28
-0
tests/system-test/7-tmq/tmqDropConsumer.py
tests/system-test/7-tmq/tmqDropConsumer.py
+282
-0
tests/system-test/7-tmq/tmqMaxGroupIds.json
tests/system-test/7-tmq/tmqMaxGroupIds.json
+27
-0
tests/system-test/7-tmq/tmqMaxGroupIds.py
tests/system-test/7-tmq/tmqMaxGroupIds.py
+246
-0
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+14
-13
未找到文件。
Jenkinsfile2
浏览文件 @
47ae92af
...
@@ -314,7 +314,7 @@ def pre_test_build_win() {
...
@@ -314,7 +314,7 @@ def pre_test_build_win() {
cd %WIN_CONNECTOR_ROOT%
cd %WIN_CONNECTOR_ROOT%
python.exe -m pip install --upgrade pip
python.exe -m pip install --upgrade pip
python -m pip uninstall taospy -y
python -m pip uninstall taospy -y
python -m pip install taospy==2.7.
6
python -m pip install taospy==2.7.
10
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
'''
return 1
return 1
...
...
docs/en/12-taos-sql/02-database.md
浏览文件 @
47ae92af
...
@@ -43,7 +43,7 @@ database_option: {
...
@@ -43,7 +43,7 @@ database_option: {
## Parameters
## Parameters
-
BUFFER: specifies the size (in MB) of the write buffer for each vnode. Enter a value between 3 and 16384. The default value is
9
6.
-
BUFFER: specifies the size (in MB) of the write buffer for each vnode. Enter a value between 3 and 16384. The default value is
25
6.
-
CACHEMODEL: specifies how the latest data in subtables is stored in the cache. The default value is none.
-
CACHEMODEL: specifies how the latest data in subtables is stored in the cache. The default value is none.
-
none: The latest data is not cached.
-
none: The latest data is not cached.
-
last_row: The last row of each subtable is cached. This option significantly improves the performance of the LAST_ROW function.
-
last_row: The last row of each subtable is cached. This option significantly improves the performance of the LAST_ROW function.
...
...
docs/examples/python/tmq_assignment_example.py
浏览文件 @
47ae92af
...
@@ -55,4 +55,4 @@ def taos_get_assignment_and_seek_demo():
...
@@ -55,4 +55,4 @@ def taos_get_assignment_and_seek_demo():
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
taos
ws
_get_assignment_and_seek_demo
()
taos_get_assignment_and_seek_demo
()
docs/zh/12-taos-sql/02-database.md
浏览文件 @
47ae92af
...
@@ -42,7 +42,7 @@ database_option: {
...
@@ -42,7 +42,7 @@ database_option: {
### 参数说明
### 参数说明
-
BUFFER: 一个 VNODE 写入内存池大小,单位为 MB,默认为
9
6,最小为 3,最大为 16384。
-
BUFFER: 一个 VNODE 写入内存池大小,单位为 MB,默认为
25
6,最小为 3,最大为 16384。
-
CACHEMODEL:表示是否在内存中缓存子表的最近数据。默认为 none。
-
CACHEMODEL:表示是否在内存中缓存子表的最近数据。默认为 none。
-
none:表示不缓存。
-
none:表示不缓存。
-
last_row:表示缓存子表最近一行数据。这将显著改善 LAST_ROW 函数的性能表现。
-
last_row:表示缓存子表最近一行数据。这将显著改善 LAST_ROW 函数的性能表现。
...
...
source/client/src/clientTmq.c
浏览文件 @
47ae92af
...
@@ -636,6 +636,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
...
@@ -636,6 +636,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
pParamSet
->
callbackFn
=
pCommitFp
;
pParamSet
->
callbackFn
=
pCommitFp
;
pParamSet
->
userParam
=
userParam
;
pParamSet
->
userParam
=
userParam
;
taosRLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" do manual commit offset for %s, vgId:%d"
,
tmq
->
consumerId
,
pTopicName
,
vgId
);
tscDebug
(
"consumer:0x%"
PRIx64
" do manual commit offset for %s, vgId:%d"
,
tmq
->
consumerId
,
pTopicName
,
vgId
);
...
@@ -646,6 +647,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
...
@@ -646,6 +647,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
pTopicName
,
numOfTopics
);
pTopicName
,
numOfTopics
);
taosMemoryFree
(
pParamSet
);
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
taosRUnLockLatch
(
&
tmq
->
lock
);
return
;
return
;
}
}
...
@@ -663,6 +665,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
...
@@ -663,6 +665,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
vgId
,
numOfVgroups
,
pTopicName
);
vgId
,
numOfVgroups
,
pTopicName
);
taosMemoryFree
(
pParamSet
);
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
taosRUnLockLatch
(
&
tmq
->
lock
);
return
;
return
;
}
}
...
@@ -679,6 +682,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
...
@@ -679,6 +682,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
taosMemoryFree
(
pParamSet
);
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
code
,
userParam
);
pCommitFp
(
tmq
,
code
,
userParam
);
}
}
taosRUnLockLatch
(
&
tmq
->
lock
);
}
}
static
void
asyncCommitAllOffsets
(
tmq_t
*
tmq
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
)
{
static
void
asyncCommitAllOffsets
(
tmq_t
*
tmq
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
)
{
...
@@ -696,6 +700,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
...
@@ -696,6 +700,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
// init as 1 to prevent concurrency issue
// init as 1 to prevent concurrency issue
pParamSet
->
waitingRspNum
=
1
;
pParamSet
->
waitingRspNum
=
1
;
taosRLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" start to commit offset for %d topics"
,
tmq
->
consumerId
,
numOfTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" start to commit offset for %d topics"
,
tmq
->
consumerId
,
numOfTopics
);
...
@@ -725,6 +730,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
...
@@ -725,6 +730,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
}
}
}
}
}
}
taosRUnLockLatch
(
&
tmq
->
lock
);
tscDebug
(
"consumer:0x%"
PRIx64
" total commit:%d for %d topics"
,
tmq
->
consumerId
,
pParamSet
->
waitingRspNum
-
1
,
tscDebug
(
"consumer:0x%"
PRIx64
" total commit:%d for %d topics"
,
tmq
->
consumerId
,
pParamSet
->
waitingRspNum
-
1
,
numOfTopics
);
numOfTopics
);
...
@@ -799,6 +805,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
...
@@ -799,6 +805,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMqHbReq
req
=
{
0
};
SMqHbReq
req
=
{
0
};
req
.
consumerId
=
tmq
->
consumerId
;
req
.
consumerId
=
tmq
->
consumerId
;
req
.
epoch
=
tmq
->
epoch
;
req
.
epoch
=
tmq
->
epoch
;
taosRLockLatch
(
&
tmq
->
lock
);
// if(tmq->needReportOffsetRows){
// if(tmq->needReportOffsetRows){
req
.
topics
=
taosArrayInit
(
taosArrayGetSize
(
tmq
->
clientTopics
),
sizeof
(
TopicOffsetRows
));
req
.
topics
=
taosArrayInit
(
taosArrayGetSize
(
tmq
->
clientTopics
),
sizeof
(
TopicOffsetRows
));
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
){
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
){
...
@@ -820,6 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
...
@@ -820,6 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
}
}
// tmq->needReportOffsetRows = false;
// tmq->needReportOffsetRows = false;
// }
// }
taosRUnLockLatch
(
&
tmq
->
lock
);
int32_t
tlen
=
tSerializeSMqHbReq
(
NULL
,
0
,
&
req
);
int32_t
tlen
=
tSerializeSMqHbReq
(
NULL
,
0
,
&
req
);
if
(
tlen
<
0
)
{
if
(
tlen
<
0
)
{
...
@@ -986,10 +994,12 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
...
@@ -986,10 +994,12 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if
(
*
topics
==
NULL
)
{
if
(
*
topics
==
NULL
)
{
*
topics
=
tmq_list_new
();
*
topics
=
tmq_list_new
();
}
}
taosRLockLatch
(
&
tmq
->
lock
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
SMqClientTopic
*
topic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
SMqClientTopic
*
topic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
tmq_list_append
(
*
topics
,
strchr
(
topic
->
topicName
,
'.'
)
+
1
);
tmq_list_append
(
*
topics
,
strchr
(
topic
->
topicName
,
'.'
)
+
1
);
}
}
taosRUnLockLatch
(
&
tmq
->
lock
);
return
0
;
return
0
;
}
}
...
@@ -1527,12 +1537,7 @@ static void freeClientVgInfo(void* param) {
...
@@ -1527,12 +1537,7 @@ static void freeClientVgInfo(void* param) {
static
bool
doUpdateLocalEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
const
SMqAskEpRsp
*
pRsp
)
{
static
bool
doUpdateLocalEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
const
SMqAskEpRsp
*
pRsp
)
{
bool
set
=
false
;
bool
set
=
false
;
int32_t
topicNumCur
=
taosArrayGetSize
(
tmq
->
clientTopics
);
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscInfo
(
"consumer:0x%"
PRIx64
" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
,
topicNumCur
);
if
(
epoch
<=
tmq
->
epoch
)
{
if
(
epoch
<=
tmq
->
epoch
)
{
return
false
;
return
false
;
}
}
...
@@ -1548,6 +1553,12 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
...
@@ -1548,6 +1553,12 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
return
false
;
return
false
;
}
}
taosWLockLatch
(
&
tmq
->
lock
);
int32_t
topicNumCur
=
taosArrayGetSize
(
tmq
->
clientTopics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscInfo
(
"consumer:0x%"
PRIx64
" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
,
topicNumCur
);
// todo extract method
// todo extract method
for
(
int32_t
i
=
0
;
i
<
topicNumCur
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
topicNumCur
;
i
++
)
{
// find old topic
// find old topic
...
@@ -1579,7 +1590,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
...
@@ -1579,7 +1590,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
taosHashCleanup
(
pVgOffsetHashMap
);
taosHashCleanup
(
pVgOffsetHashMap
);
taosWLockLatch
(
&
tmq
->
lock
);
// destroy current buffered existed topics info
// destroy current buffered existed topics info
if
(
tmq
->
clientTopics
)
{
if
(
tmq
->
clientTopics
)
{
taosArrayDestroyEx
(
tmq
->
clientTopics
,
freeClientVgInfo
);
taosArrayDestroyEx
(
tmq
->
clientTopics
,
freeClientVgInfo
);
...
@@ -1807,6 +1817,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
...
@@ -1807,6 +1817,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if
(
atomic_load_8
(
&
tmq
->
status
)
==
TMQ_CONSUMER_STATUS__RECOVER
){
if
(
atomic_load_8
(
&
tmq
->
status
)
==
TMQ_CONSUMER_STATUS__RECOVER
){
return
0
;
return
0
;
}
}
int32_t
code
=
0
;
taosWLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" start to poll data, numOfTopics:%d"
,
tmq
->
consumerId
,
numOfTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" start to poll data, numOfTopics:%d"
,
tmq
->
consumerId
,
numOfTopics
);
...
@@ -1816,7 +1829,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
...
@@ -1816,7 +1829,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for
(
int
j
=
0
;
j
<
numOfVg
;
j
++
)
{
for
(
int
j
=
0
;
j
<
numOfVg
;
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
taosGetTimestampMs
()
-
pVg
->
emptyBlockReceiveTs
<
EMPTY_BLOCK_POLL_IDLE_DURATION
)
{
// less than 10
0
ms
if
(
taosGetTimestampMs
()
-
pVg
->
emptyBlockReceiveTs
<
EMPTY_BLOCK_POLL_IDLE_DURATION
)
{
// less than 10ms
tscTrace
(
"consumer:0x%"
PRIx64
" epoch %d, vgId:%d idle for 10ms before start next poll"
,
tmq
->
consumerId
,
tscTrace
(
"consumer:0x%"
PRIx64
" epoch %d, vgId:%d idle for 10ms before start next poll"
,
tmq
->
consumerId
,
tmq
->
epoch
,
pVg
->
vgId
);
tmq
->
epoch
,
pVg
->
vgId
);
continue
;
continue
;
...
@@ -1831,15 +1844,17 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
...
@@ -1831,15 +1844,17 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
}
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
int32_t
code
=
doTmqPollImpl
(
tmq
,
pTopic
,
pVg
,
timeout
);
code
=
doTmqPollImpl
(
tmq
,
pTopic
,
pVg
,
timeout
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
end
;
}
}
}
}
}
}
tscDebug
(
"consumer:0x%"
PRIx64
" end to poll data"
,
tmq
->
consumerId
);
end:
return
0
;
taosWUnLockLatch
(
&
tmq
->
lock
);
tscDebug
(
"consumer:0x%"
PRIx64
" end to poll data, code:%d"
,
tmq
->
consumerId
,
code
);
return
code
;
}
}
static
int32_t
tmqHandleNoPollRsp
(
tmq_t
*
tmq
,
SMqRspWrapper
*
rspWrapper
,
bool
*
pReset
)
{
static
int32_t
tmqHandleNoPollRsp
(
tmq_t
*
tmq
,
SMqRspWrapper
*
rspWrapper
,
bool
*
pReset
)
{
...
@@ -1891,12 +1906,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
...
@@ -1891,12 +1906,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp
*
pDataRsp
=
&
pollRspWrapper
->
dataRsp
;
SMqDataRsp
*
pDataRsp
=
&
pollRspWrapper
->
dataRsp
;
if
(
pDataRsp
->
head
.
epoch
==
consumerEpoch
)
{
if
(
pDataRsp
->
head
.
epoch
==
consumerEpoch
)
{
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
NULL
;
return
NULL
;
}
}
// update the epset
// update the epset
...
@@ -1944,8 +1961,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
...
@@ -1944,8 +1961,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pDataRsp
->
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
tmq
->
totalRows
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pDataRsp
->
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
tmq
->
totalRows
,
pollRspWrapper
->
reqId
);
pollRspWrapper
->
reqId
);
taosFreeQitem
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
return
pRsp
;
}
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pDataRsp
->
head
.
epoch
,
consumerEpoch
);
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pDataRsp
->
head
.
epoch
,
consumerEpoch
);
...
@@ -1960,12 +1979,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
...
@@ -1960,12 +1979,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug
(
"consumer:0x%"
PRIx64
" process meta rsp"
,
tmq
->
consumerId
);
tscDebug
(
"consumer:0x%"
PRIx64
" process meta rsp"
,
tmq
->
consumerId
);
if
(
pollRspWrapper
->
metaRsp
.
head
.
epoch
==
consumerEpoch
)
{
if
(
pollRspWrapper
->
metaRsp
.
head
.
epoch
==
consumerEpoch
)
{
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
NULL
;
return
NULL
;
}
}
...
@@ -1977,6 +1998,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
...
@@ -1977,6 +1998,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
// build rsp
SMqMetaRspObj
*
pRsp
=
tmqBuildMetaRspFromWrapper
(
pollRspWrapper
);
SMqMetaRspObj
*
pRsp
=
tmqBuildMetaRspFromWrapper
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
return
pRsp
;
}
else
{
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
...
@@ -1989,12 +2011,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
...
@@ -1989,12 +2011,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t
consumerEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
int32_t
consumerEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
pollRspWrapper
->
taosxRsp
.
head
.
epoch
==
consumerEpoch
)
{
if
(
pollRspWrapper
->
taosxRsp
.
head
.
epoch
==
consumerEpoch
)
{
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
SMqClientVg
*
pVg
=
getVgInfo
(
tmq
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
vgHandle
=
pVg
;
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
pollRspWrapper
->
topicHandle
=
getTopicInfo
(
tmq
,
pollRspWrapper
->
topicName
);
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
if
(
pollRspWrapper
->
vgHandle
==
NULL
||
pollRspWrapper
->
topicHandle
==
NULL
){
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
tscError
(
"consumer:0x%"
PRIx64
" get vg or topic error, topic:%s vgId:%d"
,
tmq
->
consumerId
,
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
pollRspWrapper
->
topicName
,
pollRspWrapper
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
NULL
;
return
NULL
;
}
}
...
@@ -2017,32 +2041,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
...
@@ -2017,32 +2041,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg
->
emptyBlockReceiveTs
=
taosGetTimestampMs
();
pVg
->
emptyBlockReceiveTs
=
taosGetTimestampMs
();
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
continue
;
}
else
{
}
else
{
pVg
->
emptyBlockReceiveTs
=
0
;
// reset the ts
pVg
->
emptyBlockReceiveTs
=
0
;
// reset the ts
}
// build rsp
void
*
pRsp
=
NULL
;
// build rsp
int64_t
numOfRows
=
0
;
void
*
pRsp
=
NULL
;
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
int64_t
numOfRows
=
0
;
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
}
else
{
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
else
{
}
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
,
pVg
,
&
numOfRows
);
}
tmq
->
totalRows
+=
numOfRows
;
tmq
->
totalRows
+=
numOfRows
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVg
->
offsetInfo
.
currentOffset
);
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVg
->
offsetInfo
.
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"
PRId64
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"
PRId64
", vg total:%"
PRId64
", total:%"
PRId64
", reqId:0x%"
PRIx64
,
", vg total:%"
PRId64
", total:%"
PRId64
", reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pollRspWrapper
->
dataRsp
.
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pollRspWrapper
->
dataRsp
.
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
tmq
->
totalRows
,
pollRspWrapper
->
reqId
);
tmq
->
totalRows
,
pollRspWrapper
->
reqId
);
taosFreeQitem
(
pollRspWrapper
);
return
pRsp
;
taosFreeQitem
(
pollRspWrapper
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tscDebug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
taosxRsp
.
head
.
epoch
,
consumerEpoch
);
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
taosxRsp
.
head
.
epoch
,
consumerEpoch
);
...
@@ -2121,7 +2144,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
...
@@ -2121,7 +2144,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
}
}
}
}
static
void
displayConsumeStatistics
(
const
tmq_t
*
pTmq
)
{
static
void
displayConsumeStatistics
(
tmq_t
*
pTmq
)
{
taosRLockLatch
(
&
pTmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
pTmq
->
clientTopics
);
int32_t
numOfTopics
=
taosArrayGetSize
(
pTmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" closing poll:%"
PRId64
" rows:%"
PRId64
" topics:%d, final epoch:%d"
,
tscDebug
(
"consumer:0x%"
PRIx64
" closing poll:%"
PRId64
" rows:%"
PRId64
" topics:%d, final epoch:%d"
,
pTmq
->
consumerId
,
pTmq
->
pollCnt
,
pTmq
->
totalRows
,
numOfTopics
,
pTmq
->
epoch
);
pTmq
->
consumerId
,
pTmq
->
pollCnt
,
pTmq
->
totalRows
,
numOfTopics
,
pTmq
->
epoch
);
...
@@ -2137,7 +2161,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) {
...
@@ -2137,7 +2161,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) {
tscDebug
(
"topic:%s, %d. vgId:%d rows:%"
PRId64
,
pTopics
->
topicName
,
j
,
pVg
->
vgId
,
pVg
->
numOfRows
);
tscDebug
(
"topic:%s, %d. vgId:%d rows:%"
PRId64
,
pTopics
->
topicName
,
j
,
pVg
->
vgId
,
pVg
->
numOfRows
);
}
}
}
}
taosRUnLockLatch
(
&
pTmq
->
lock
);
tscDebug
(
"consumer:0x%"
PRIx64
" rows dist end"
,
pTmq
->
consumerId
);
tscDebug
(
"consumer:0x%"
PRIx64
" rows dist end"
,
pTmq
->
consumerId
);
}
}
...
@@ -2544,14 +2568,18 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
...
@@ -2544,14 +2568,18 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int32_t
*
numOfAssignment
)
{
int32_t
*
numOfAssignment
)
{
*
numOfAssignment
=
0
;
*
numOfAssignment
=
0
;
*
assignment
=
NULL
;
*
assignment
=
NULL
;
SMqVgCommon
*
pCommon
=
NULL
;
int32_t
accId
=
tmq
->
pTscObj
->
acctId
;
int32_t
accId
=
tmq
->
pTscObj
->
acctId
;
char
tname
[
128
]
=
{
0
};
char
tname
[
128
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
if
(
pTopic
==
NULL
)
{
if
(
pTopic
==
NULL
)
{
return
TSDB_CODE_INVALID_PARA
;
code
=
TSDB_CODE_INVALID_PARA
;
goto
end
;
}
}
// in case of snapshot is opened, no valid offset will return
// in case of snapshot is opened, no valid offset will return
...
@@ -2561,7 +2589,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
...
@@ -2561,7 +2589,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
if
(
*
assignment
==
NULL
)
{
if
(
*
assignment
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" failed to malloc buffer, size:%"
PRIzu
,
tmq
->
consumerId
,
tscError
(
"consumer:0x%"
PRIx64
" failed to malloc buffer, size:%"
PRIzu
,
tmq
->
consumerId
,
(
*
numOfAssignment
)
*
sizeof
(
tmq_topic_assignment
));
(
*
numOfAssignment
)
*
sizeof
(
tmq_topic_assignment
));
return
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
}
bool
needFetch
=
false
;
bool
needFetch
=
false
;
...
@@ -2586,10 +2615,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
...
@@ -2586,10 +2615,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
}
if
(
needFetch
)
{
if
(
needFetch
)
{
SMqVgCommon
*
pCommon
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqVgCommon
));
pCommon
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqVgCommon
));
if
(
pCommon
==
NULL
)
{
if
(
pCommon
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
terrno
;
code
=
terrno
;
goto
end
;
}
}
pCommon
->
pList
=
taosArrayInit
(
4
,
sizeof
(
tmq_topic_assignment
));
pCommon
->
pList
=
taosArrayInit
(
4
,
sizeof
(
tmq_topic_assignment
));
...
@@ -2604,8 +2634,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
...
@@ -2604,8 +2634,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqVgWalInfoParam
*
pParam
=
taosMemoryMalloc
(
sizeof
(
SMqVgWalInfoParam
));
SMqVgWalInfoParam
*
pParam
=
taosMemoryMalloc
(
sizeof
(
SMqVgWalInfoParam
));
if
(
pParam
==
NULL
)
{
if
(
pParam
==
NULL
)
{
destroyCommonInfo
(
pCommon
)
;
code
=
terrno
;
return
terrno
;
goto
end
;
}
}
pParam
->
epoch
=
tmq
->
epoch
;
pParam
->
epoch
=
tmq
->
epoch
;
...
@@ -2619,30 +2649,30 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
...
@@ -2619,30 +2649,30 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int32_t
msgSize
=
tSerializeSMqPollReq
(
NULL
,
0
,
&
req
);
int32_t
msgSize
=
tSerializeSMqPollReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
if
(
msgSize
<
0
)
{
taosMemoryFree
(
pParam
);
taosMemoryFree
(
pParam
);
destroyCommonInfo
(
pCommon
)
;
code
=
terrno
;
return
terrno
;
goto
end
;
}
}
char
*
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
char
*
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
if
(
NULL
==
msg
)
{
taosMemoryFree
(
pParam
);
taosMemoryFree
(
pParam
);
destroyCommonInfo
(
pCommon
)
;
code
=
terrno
;
return
terrno
;
goto
end
;
}
}
if
(
tSerializeSMqPollReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
if
(
tSerializeSMqPollReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
taosMemoryFree
(
msg
);
taosMemoryFree
(
msg
);
taosMemoryFree
(
pParam
);
taosMemoryFree
(
pParam
);
destroyCommonInfo
(
pCommon
)
;
code
=
terrno
;
return
terrno
;
goto
end
;
}
}
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
sendInfo
==
NULL
)
{
if
(
sendInfo
==
NULL
)
{
taosMemoryFree
(
pParam
);
taosMemoryFree
(
pParam
);
taosMemoryFree
(
msg
);
taosMemoryFree
(
msg
);
destroyCommonInfo
(
pCommon
)
;
code
=
terrno
;
return
terrno
;
goto
end
;
}
}
sendInfo
->
msgInfo
=
(
SDataBuf
){.
pData
=
msg
,
.
len
=
msgSize
,
.
handle
=
NULL
};
sendInfo
->
msgInfo
=
(
SDataBuf
){.
pData
=
msg
,
.
len
=
msgSize
,
.
handle
=
NULL
};
...
@@ -2662,20 +2692,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
...
@@ -2662,20 +2692,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
}
tsem_wait
(
&
pCommon
->
rsp
);
tsem_wait
(
&
pCommon
->
rsp
);
int32_t
code
=
pCommon
->
code
;
code
=
pCommon
->
code
;
terrno
=
code
;
terrno
=
code
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
*
assignment
);
goto
end
;
*
assignment
=
NULL
;
}
*
numOfAssignment
=
0
;
int32_t
num
=
taosArrayGetSize
(
pCommon
->
pList
);
}
else
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
int32_t
num
=
taosArrayGetSize
(
pCommon
->
pList
);
(
*
assignment
)[
i
]
=
*
(
tmq_topic_assignment
*
)
taosArrayGet
(
pCommon
->
pList
,
i
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
(
*
assignment
)[
i
]
=
*
(
tmq_topic_assignment
*
)
taosArrayGet
(
pCommon
->
pList
,
i
);
}
*
numOfAssignment
=
num
;
}
}
*
numOfAssignment
=
num
;
for
(
int32_t
j
=
0
;
j
<
(
*
numOfAssignment
);
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
(
*
numOfAssignment
);
++
j
)
{
tmq_topic_assignment
*
p
=
&
(
*
assignment
)[
j
];
tmq_topic_assignment
*
p
=
&
(
*
assignment
)[
j
];
...
@@ -2701,12 +2728,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
...
@@ -2701,12 +2728,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pOffsetInfo
->
committedOffset
.
version
=
p
->
currentOffset
;
pOffsetInfo
->
committedOffset
.
version
=
p
->
currentOffset
;
}
}
}
}
}
destroyCommonInfo
(
pCommon
);
end:
return
code
;
if
(
code
!=
TSDB_CODE_SUCCESS
){
}
else
{
taosMemoryFree
(
*
assignment
);
return
TSDB_CODE_SUCCESS
;
*
assignment
=
NULL
;
*
numOfAssignment
=
0
;
}
}
destroyCommonInfo
(
pCommon
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
code
;
}
}
void
tmq_free_assignment
(
tmq_topic_assignment
*
pAssignment
)
{
void
tmq_free_assignment
(
tmq_topic_assignment
*
pAssignment
)
{
...
@@ -2727,9 +2759,11 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
...
@@ -2727,9 +2759,11 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
char
tname
[
128
]
=
{
0
};
char
tname
[
128
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
if
(
pTopic
==
NULL
)
{
if
(
pTopic
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid topic name:%s"
,
tmq
->
consumerId
,
pTopicName
);
tscError
(
"consumer:0x%"
PRIx64
" invalid topic name:%s"
,
tmq
->
consumerId
,
pTopicName
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
}
...
@@ -2745,6 +2779,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
...
@@ -2745,6 +2779,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if
(
pVg
==
NULL
)
{
if
(
pVg
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid vgroup id:%d"
,
tmq
->
consumerId
,
vgId
);
tscError
(
"consumer:0x%"
PRIx64
" invalid vgroup id:%d"
,
tmq
->
consumerId
,
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
}
...
@@ -2753,12 +2788,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
...
@@ -2753,12 +2788,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
int32_t
type
=
pOffsetInfo
->
currentOffset
.
type
;
int32_t
type
=
pOffsetInfo
->
currentOffset
.
type
;
if
(
type
!=
TMQ_OFFSET__LOG
&&
!
OFFSET_IS_RESET_OFFSET
(
type
))
{
if
(
type
!=
TMQ_OFFSET__LOG
&&
!
OFFSET_IS_RESET_OFFSET
(
type
))
{
tscError
(
"consumer:0x%"
PRIx64
" offset type:%d not wal version, seek not allowed"
,
tmq
->
consumerId
,
type
);
tscError
(
"consumer:0x%"
PRIx64
" offset type:%d not wal version, seek not allowed"
,
tmq
->
consumerId
,
type
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
}
if
(
type
==
TMQ_OFFSET__LOG
&&
(
offset
<
pOffsetInfo
->
walVerBegin
||
offset
>
pOffsetInfo
->
walVerEnd
))
{
if
(
type
==
TMQ_OFFSET__LOG
&&
(
offset
<
pOffsetInfo
->
walVerBegin
||
offset
>
pOffsetInfo
->
walVerEnd
))
{
tscError
(
"consumer:0x%"
PRIx64
" invalid seek params, offset:%"
PRId64
", valid range:[%"
PRId64
", %"
PRId64
"]"
,
tscError
(
"consumer:0x%"
PRIx64
" invalid seek params, offset:%"
PRId64
", valid range:[%"
PRId64
", %"
PRId64
"]"
,
tmq
->
consumerId
,
offset
,
pOffsetInfo
->
walVerBegin
,
pOffsetInfo
->
walVerEnd
);
tmq
->
consumerId
,
offset
,
pOffsetInfo
->
walVerBegin
,
pOffsetInfo
->
walVerEnd
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
}
...
@@ -2773,6 +2810,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
...
@@ -2773,6 +2810,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
tstrncpy
(
rspObj
.
topic
,
tname
,
tListLen
(
rspObj
.
topic
));
tstrncpy
(
rspObj
.
topic
,
tname
,
tListLen
(
rspObj
.
topic
));
tscInfo
(
"consumer:0x%"
PRIx64
" seek to %"
PRId64
" on vgId:%d"
,
tmq
->
consumerId
,
offset
,
pVg
->
vgId
);
tscInfo
(
"consumer:0x%"
PRIx64
" seek to %"
PRId64
" on vgId:%d"
,
tmq
->
consumerId
,
offset
,
pVg
->
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
SSyncCommitInfo
*
pInfo
=
taosMemoryMalloc
(
sizeof
(
SSyncCommitInfo
));
SSyncCommitInfo
*
pInfo
=
taosMemoryMalloc
(
sizeof
(
SSyncCommitInfo
));
if
(
pInfo
==
NULL
)
{
if
(
pInfo
==
NULL
)
{
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
47ae92af
...
@@ -818,6 +818,9 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc
...
@@ -818,6 +818,9 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
curRowIndex
,
(
const
char
*
)
&
pQuery
->
stableQuery
,
false
);
colDataSetVal
(
pColInfo
,
curRowIndex
,
(
const
char
*
)
&
pQuery
->
stableQuery
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
curRowIndex
,
(
const
char
*
)
&
pQuery
->
isSubQuery
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
curRowIndex
,
(
const
char
*
)
&
pQuery
->
subPlanNum
,
false
);
colDataSetVal
(
pColInfo
,
curRowIndex
,
(
const
char
*
)
&
pQuery
->
subPlanNum
,
false
);
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
47ae92af
...
@@ -360,7 +360,12 @@ static int32_t vnodeCommitTask(void *arg) {
...
@@ -360,7 +360,12 @@ static int32_t vnodeCommitTask(void *arg) {
// commit
// commit
code
=
vnodeCommitImpl
(
pInfo
);
code
=
vnodeCommitImpl
(
pInfo
);
if
(
code
)
goto
_exit
;
if
(
code
)
{
vFatal
(
"vgId:%d, failed to commit vnode since %s"
,
TD_VID
(
pVnode
),
terrstr
());
taosMsleep
(
100
);
exit
(
EXIT_FAILURE
);
goto
_exit
;
}
vnodeReturnBufPool
(
pVnode
);
vnodeReturnBufPool
(
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeRetention.c
浏览文件 @
47ae92af
...
@@ -121,7 +121,7 @@ int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) {
...
@@ -121,7 +121,7 @@ int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) {
_exit:
_exit:
if
(
code
)
{
if
(
code
)
{
vError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
p
Info
->
p
Vnode
),
__func__
,
lino
,
tstrerror
(
code
));
vError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
if
(
pInfo
)
taosMemoryFree
(
pInfo
);
if
(
pInfo
)
taosMemoryFree
(
pInfo
);
}
else
{
}
else
{
vInfo
(
"vgId:%d %s done"
,
TD_VID
(
pInfo
->
pVnode
),
__func__
);
vInfo
(
"vgId:%d %s done"
,
TD_VID
(
pInfo
->
pVnode
),
__func__
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
47ae92af
...
@@ -581,7 +581,9 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
...
@@ -581,7 +581,9 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in vnode query queue is processing"
);
vTrace
(
"message in vnode query queue is processing"
);
if
((
pMsg
->
msgType
==
TDMT_SCH_QUERY
||
pMsg
->
msgType
==
TDMT_VND_TMQ_CONSUME
||
pMsg
->
msgType
==
TDMT_VND_TMQ_CONSUME_PUSH
)
&&
!
syncIsReadyForRead
(
pVnode
->
sync
))
{
if
((
pMsg
->
msgType
==
TDMT_SCH_QUERY
||
pMsg
->
msgType
==
TDMT_VND_TMQ_CONSUME
||
pMsg
->
msgType
==
TDMT_VND_TMQ_CONSUME_PUSH
)
&&
!
syncIsReadyForRead
(
pVnode
->
sync
))
{
vnodeRedirectRpcMsg
(
pVnode
,
pMsg
,
terrno
);
vnodeRedirectRpcMsg
(
pVnode
,
pMsg
,
terrno
);
return
0
;
return
0
;
}
}
...
@@ -637,8 +639,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
...
@@ -637,8 +639,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
vnodeGetTableCfg
(
pVnode
,
pMsg
,
true
);
return
vnodeGetTableCfg
(
pVnode
,
pMsg
,
true
);
case
TDMT_VND_BATCH_META
:
case
TDMT_VND_BATCH_META
:
return
vnodeGetBatchMeta
(
pVnode
,
pMsg
);
return
vnodeGetBatchMeta
(
pVnode
,
pMsg
);
// case TDMT_VND_TMQ_CONSUME:
// case TDMT_VND_TMQ_CONSUME:
// return tqProcessPollReq(pVnode->pTq, pMsg);
// return tqProcessPollReq(pVnode->pTq, pMsg);
case
TDMT_VND_TMQ_VG_WALINFO
:
case
TDMT_VND_TMQ_VG_WALINFO
:
return
tqProcessVgWalInfoReq
(
pVnode
->
pTq
,
pMsg
);
return
tqProcessVgWalInfoReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_TASK_RUN
:
case
TDMT_STREAM_TASK_RUN
:
...
@@ -1370,7 +1372,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
...
@@ -1370,7 +1372,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
}
}
if
(
info
.
suid
)
{
if
(
info
.
suid
)
{
metaGetInfo
(
pVnode
->
pMeta
,
info
.
suid
,
&
info
,
NULL
);
code
=
metaGetInfo
(
pVnode
->
pMeta
,
info
.
suid
,
&
info
,
NULL
);
ASSERT
(
code
==
0
);
}
}
if
(
pSubmitTbData
->
sver
!=
info
.
skmVer
)
{
if
(
pSubmitTbData
->
sver
!=
info
.
skmVer
)
{
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
47ae92af
...
@@ -2702,13 +2702,12 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
...
@@ -2702,13 +2702,12 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
}
}
static
int32_t
doHandleDiff
(
SDiffInfo
*
pDiffInfo
,
int32_t
type
,
const
char
*
pv
,
SColumnInfoData
*
pOutput
,
int32_t
pos
,
static
int32_t
doHandleDiff
(
SDiffInfo
*
pDiffInfo
,
int32_t
type
,
const
char
*
pv
,
SColumnInfoData
*
pOutput
,
int32_t
pos
,
int32_t
order
,
int64_t
ts
)
{
int64_t
ts
)
{
int32_t
factor
=
(
order
==
TSDB_ORDER_ASC
)
?
1
:
-
1
;
pDiffInfo
->
prevTs
=
ts
;
pDiffInfo
->
prevTs
=
ts
;
switch
(
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_INT
:
{
case
TSDB_DATA_TYPE_INT
:
{
int32_t
v
=
*
(
int32_t
*
)
pv
;
int32_t
v
=
*
(
int32_t
*
)
pv
;
int64_t
delta
=
factor
*
(
v
-
pDiffInfo
->
prev
.
i64
)
;
// direct previous may be null
int64_t
delta
=
v
-
pDiffInfo
->
prev
.
i64
;
// direct previous may be null
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
colDataSetNull_f_s
(
pOutput
,
pos
);
colDataSetNull_f_s
(
pOutput
,
pos
);
}
else
{
}
else
{
...
@@ -2721,7 +2720,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
...
@@ -2721,7 +2720,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
{
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
v
=
*
(
int8_t
*
)
pv
;
int8_t
v
=
*
(
int8_t
*
)
pv
;
int64_t
delta
=
factor
*
(
v
-
pDiffInfo
->
prev
.
i64
)
;
// direct previous may be null
int64_t
delta
=
v
-
pDiffInfo
->
prev
.
i64
;
// direct previous may be null
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
colDataSetNull_f_s
(
pOutput
,
pos
);
colDataSetNull_f_s
(
pOutput
,
pos
);
}
else
{
}
else
{
...
@@ -2732,7 +2731,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
...
@@ -2732,7 +2731,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
}
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
v
=
*
(
int16_t
*
)
pv
;
int16_t
v
=
*
(
int16_t
*
)
pv
;
int64_t
delta
=
factor
*
(
v
-
pDiffInfo
->
prev
.
i64
)
;
// direct previous may be null
int64_t
delta
=
v
-
pDiffInfo
->
prev
.
i64
;
// direct previous may be null
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
colDataSetNull_f_s
(
pOutput
,
pos
);
colDataSetNull_f_s
(
pOutput
,
pos
);
}
else
{
}
else
{
...
@@ -2744,7 +2743,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
...
@@ -2744,7 +2743,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case
TSDB_DATA_TYPE_TIMESTAMP
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
case
TSDB_DATA_TYPE_BIGINT
:
{
case
TSDB_DATA_TYPE_BIGINT
:
{
int64_t
v
=
*
(
int64_t
*
)
pv
;
int64_t
v
=
*
(
int64_t
*
)
pv
;
int64_t
delta
=
factor
*
(
v
-
pDiffInfo
->
prev
.
i64
)
;
// direct previous may be null
int64_t
delta
=
v
-
pDiffInfo
->
prev
.
i64
;
// direct previous may be null
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
if
(
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
{
colDataSetNull_f_s
(
pOutput
,
pos
);
colDataSetNull_f_s
(
pOutput
,
pos
);
}
else
{
}
else
{
...
@@ -2755,7 +2754,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
...
@@ -2755,7 +2754,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
}
}
case
TSDB_DATA_TYPE_FLOAT
:
{
case
TSDB_DATA_TYPE_FLOAT
:
{
float
v
=
*
(
float
*
)
pv
;
float
v
=
*
(
float
*
)
pv
;
double
delta
=
factor
*
(
v
-
pDiffInfo
->
prev
.
d64
);
// direct previous may be null
double
delta
=
v
-
pDiffInfo
->
prev
.
d64
;
// direct previous may be null
if
((
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
||
isinf
(
delta
)
||
isnan
(
delta
))
{
// check for overflow
if
((
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
||
isinf
(
delta
)
||
isnan
(
delta
))
{
// check for overflow
colDataSetNull_f_s
(
pOutput
,
pos
);
colDataSetNull_f_s
(
pOutput
,
pos
);
}
else
{
}
else
{
...
@@ -2766,7 +2765,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
...
@@ -2766,7 +2765,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
}
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
v
=
*
(
double
*
)
pv
;
double
v
=
*
(
double
*
)
pv
;
double
delta
=
factor
*
(
v
-
pDiffInfo
->
prev
.
d64
);
// direct previous may be null
double
delta
=
v
-
pDiffInfo
->
prev
.
d64
;
// direct previous may be null
if
((
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
||
isinf
(
delta
)
||
isnan
(
delta
))
{
// check for overflow
if
((
delta
<
0
&&
pDiffInfo
->
ignoreNegative
)
||
isinf
(
delta
)
||
isnan
(
delta
))
{
// check for overflow
colDataSetNull_f_s
(
pOutput
,
pos
);
colDataSetNull_f_s
(
pOutput
,
pos
);
}
else
{
}
else
{
...
@@ -2797,82 +2796,42 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
...
@@ -2797,82 +2796,42 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
SColumnInfoData
*
pOutput
=
(
SColumnInfoData
*
)
pCtx
->
pOutput
;
if
(
pCtx
->
order
==
TSDB_ORDER_ASC
)
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
1
)
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
1
)
{
int32_t
pos
=
startOffset
+
numOfElems
;
int32_t
pos
=
startOffset
+
numOfElems
;
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
if
(
pDiffInfo
->
includeNull
)
{
if
(
pDiffInfo
->
includeNull
)
{
colDataSetNull_f_s
(
pOutput
,
pos
);
colDataSetNull_f_s
(
pOutput
,
pos
);
numOfElems
+=
1
;
numOfElems
+=
1
;
}
continue
;
}
}
continue
;
}
char
*
pv
=
colDataGetData
(
pInputCol
,
i
);
char
*
pv
=
colDataGetData
(
pInputCol
,
i
);
if
(
pDiffInfo
->
hasPrev
)
{
if
(
pDiffInfo
->
hasPrev
)
{
if
(
tsList
[
i
]
==
pDiffInfo
->
prevTs
)
{
if
(
tsList
[
i
]
==
pDiffInfo
->
prevTs
)
{
return
TSDB_CODE_FUNC_DUP_TIMESTAMP
;
return
TSDB_CODE_FUNC_DUP_TIMESTAMP
;
}
int32_t
code
=
doHandleDiff
(
pDiffInfo
,
pInputCol
->
info
.
type
,
pv
,
pOutput
,
pos
,
pCtx
->
order
,
tsList
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// handle selectivity
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
appendSelectivityValue
(
pCtx
,
i
,
pos
);
}
numOfElems
++
;
}
else
{
int32_t
code
=
doSetPrevVal
(
pDiffInfo
,
pInputCol
->
info
.
type
,
pv
,
tsList
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
}
int32_t
code
=
doHandleDiff
(
pDiffInfo
,
pInputCol
->
info
.
type
,
pv
,
pOutput
,
pos
,
tsList
[
i
]);
pDiffInfo
->
hasPrev
=
true
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
}
return
code
;
}
else
{
for
(
int32_t
i
=
pInput
->
startRowIndex
;
i
<
pInput
->
numOfRows
+
pInput
->
startRowIndex
;
i
+=
1
)
{
int32_t
pos
=
startOffset
+
numOfElems
;
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
if
(
pDiffInfo
->
includeNull
)
{
colDataSetNull_f_s
(
pOutput
,
pos
);
numOfElems
+=
1
;
}
continue
;
}
}
// handle selectivity
char
*
pv
=
colDataGetData
(
pInputCol
,
i
);
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
appendSelectivityValue
(
pCtx
,
i
,
pos
);
// there is a row of previous data block to be handled in the first place.
if
(
pDiffInfo
->
hasPrev
)
{
if
(
tsList
[
i
]
==
pDiffInfo
->
prevTs
)
{
return
TSDB_CODE_FUNC_DUP_TIMESTAMP
;
}
int32_t
code
=
doHandleDiff
(
pDiffInfo
,
pInputCol
->
info
.
type
,
pv
,
pOutput
,
pos
,
pCtx
->
order
,
tsList
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
// handle selectivity
if
(
pCtx
->
subsidiaries
.
num
>
0
)
{
appendSelectivityValue
(
pCtx
,
i
,
pos
);
}
numOfElems
++
;
}
else
{
int32_t
code
=
doSetPrevVal
(
pDiffInfo
,
pInputCol
->
info
.
type
,
pv
,
tsList
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
}
pDiffInfo
->
hasPrev
=
true
;
numOfElems
++
;
}
else
{
int32_t
code
=
doSetPrevVal
(
pDiffInfo
,
pInputCol
->
info
.
type
,
pv
,
tsList
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
}
pDiffInfo
->
hasPrev
=
true
;
}
}
pResInfo
->
numOfRes
=
numOfElems
;
pResInfo
->
numOfRes
=
numOfElems
;
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
47ae92af
...
@@ -602,18 +602,18 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
...
@@ -602,18 +602,18 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
// ftruncate idx file
// ftruncate idx file
if
(
offset
<
fileSize
)
{
if
(
offset
<
fileSize
)
{
if
(
taosFtruncateFile
(
pIdxFile
,
offset
)
<
0
)
{
if
(
taosFtruncateFile
(
pIdxFile
,
offset
)
<
0
)
{
wError
(
"vgId:%d, failed to ftruncate file due to %s. offset:%"
PRId64
", file:%s"
,
pWal
->
cfg
.
vgId
,
strerror
(
errno
),
offset
,
fnameStr
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, failed to ftruncate file since %s. offset:%"
PRId64
", file:%s"
,
pWal
->
cfg
.
vgId
,
terrstr
(),
offset
,
fnameStr
);
goto
_err
;
goto
_err
;
}
}
}
}
// rebuild idx file
// rebuild idx file
if
(
taosLSeekFile
(
pIdxFile
,
0
,
SEEK_END
)
<
0
)
{
if
(
taosLSeekFile
(
pIdxFile
,
0
,
SEEK_END
)
<
0
)
{
wError
(
"vgId:%d, failed to seek file due to %s. offset:%"
PRId64
", file:%s"
,
pWal
->
cfg
.
vgId
,
strerror
(
errno
),
offset
,
fnameStr
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, failed to seek file since %s. offset:%"
PRId64
", file:%s"
,
pWal
->
cfg
.
vgId
,
terrstr
(),
offset
,
fnameStr
);
goto
_err
;
goto
_err
;
}
}
...
@@ -625,11 +625,12 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
...
@@ -625,11 +625,12 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
idxEntry
.
offset
+=
sizeof
(
SWalCkHead
)
+
ckHead
.
head
.
bodyLen
;
idxEntry
.
offset
+=
sizeof
(
SWalCkHead
)
+
ckHead
.
head
.
bodyLen
;
if
(
walReadLogHead
(
pLogFile
,
idxEntry
.
offset
,
&
ckHead
)
<
0
)
{
if
(
walReadLogHead
(
pLogFile
,
idxEntry
.
offset
,
&
ckHead
)
<
0
)
{
wError
(
"vgId:%d, failed to read wal log head since %s.
offset:%"
PRId64
", file:%s"
,
pWal
->
cfg
.
vgId
,
terrstr
()
,
wError
(
"vgId:%d, failed to read wal log head since %s.
index:%"
PRId64
", offset:%"
PRId64
", file:%s"
,
idxEntry
.
offset
,
fLogNameStr
);
pWal
->
cfg
.
vgId
,
terrstr
(),
idxEntry
.
ver
,
idxEntry
.
offset
,
fLogNameStr
);
goto
_err
;
goto
_err
;
}
}
if
(
taosWriteFile
(
pIdxFile
,
&
idxEntry
,
sizeof
(
SWalIdxEntry
))
<
0
)
{
if
(
taosWriteFile
(
pIdxFile
,
&
idxEntry
,
sizeof
(
SWalIdxEntry
))
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, failed to append file since %s. file:%s"
,
pWal
->
cfg
.
vgId
,
terrstr
(),
fnameStr
);
wError
(
"vgId:%d, failed to append file since %s. file:%s"
,
pWal
->
cfg
.
vgId
,
terrstr
(),
fnameStr
);
goto
_err
;
goto
_err
;
}
}
...
@@ -637,6 +638,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
...
@@ -637,6 +638,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) {
}
}
if
(
taosFsyncFile
(
pIdxFile
)
<
0
)
{
if
(
taosFsyncFile
(
pIdxFile
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, faild to fsync file since %s. file:%s"
,
pWal
->
cfg
.
vgId
,
terrstr
(),
fnameStr
);
wError
(
"vgId:%d, faild to fsync file since %s. file:%s"
,
pWal
->
cfg
.
vgId
,
terrstr
(),
fnameStr
);
goto
_err
;
goto
_err
;
}
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
47ae92af
...
@@ -473,7 +473,10 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
...
@@ -473,7 +473,10 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
// check alignment of idx entries
// check alignment of idx entries
int64_t
endOffset
=
taosLSeekFile
(
pWal
->
pIdxFile
,
0
,
SEEK_END
);
int64_t
endOffset
=
taosLSeekFile
(
pWal
->
pIdxFile
,
0
,
SEEK_END
);
if
(
endOffset
<
0
)
{
if
(
endOffset
<
0
)
{
wFatal
(
"vgId:%d, failed to seek end of idxfile due to %s. ver:%"
PRId64
""
,
pWal
->
cfg
.
vgId
,
strerror
(
errno
),
ver
);
wFatal
(
"vgId:%d, failed to seek end of WAL idxfile due to %s. ver:%"
PRId64
""
,
pWal
->
cfg
.
vgId
,
strerror
(
errno
),
ver
);
taosMsleep
(
100
);
exit
(
EXIT_FAILURE
);
}
}
return
0
;
return
0
;
}
}
...
@@ -533,16 +536,20 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
...
@@ -533,16 +536,20 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
END:
END:
// recover in a reverse order
// recover in a reverse order
if
(
taosFtruncateFile
(
pWal
->
pLogFile
,
offset
)
<
0
)
{
if
(
taosFtruncateFile
(
pWal
->
pLogFile
,
offset
)
<
0
)
{
wFatal
(
"vgId:%d, failed to ftruncate logfile to offset:%"
PRId64
" during recovery due to %s"
,
pWal
->
cfg
.
vgId
,
offset
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wFatal
(
"vgId:%d, failed to recover WAL logfile from write error since %s, offset:%"
PRId64
,
pWal
->
cfg
.
vgId
,
terrstr
(),
offset
);
taosMsleep
(
100
);
exit
(
EXIT_FAILURE
);
}
}
int64_t
idxOffset
=
(
index
-
pFileInfo
->
firstVer
)
*
sizeof
(
SWalIdxEntry
);
int64_t
idxOffset
=
(
index
-
pFileInfo
->
firstVer
)
*
sizeof
(
SWalIdxEntry
);
if
(
taosFtruncateFile
(
pWal
->
pIdxFile
,
idxOffset
)
<
0
)
{
if
(
taosFtruncateFile
(
pWal
->
pIdxFile
,
idxOffset
)
<
0
)
{
wFatal
(
"vgId:%d, failed to ftruncate idxfile to offset:%"
PRId64
"during recovery due to %s"
,
pWal
->
cfg
.
vgId
,
idxOffset
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wFatal
(
"vgId:%d, failed to recover WAL idxfile from write error since %s, offset:%"
PRId64
,
pWal
->
cfg
.
vgId
,
terrstr
(),
idxOffset
);
taosMsleep
(
100
);
exit
(
EXIT_FAILURE
);
}
}
return
-
1
;
return
-
1
;
}
}
...
...
source/util/src/tlog.c
浏览文件 @
47ae92af
...
@@ -486,24 +486,11 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
...
@@ -486,24 +486,11 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
static
inline
void
taosPrintLogImp
(
ELogLevel
level
,
int32_t
dflag
,
const
char
*
buffer
,
int32_t
len
)
{
static
inline
void
taosPrintLogImp
(
ELogLevel
level
,
int32_t
dflag
,
const
char
*
buffer
,
int32_t
len
)
{
if
((
dflag
&
DEBUG_FILE
)
&&
tsLogObj
.
logHandle
&&
tsLogObj
.
logHandle
->
pFile
!=
NULL
&&
osLogSpaceAvailable
())
{
if
((
dflag
&
DEBUG_FILE
)
&&
tsLogObj
.
logHandle
&&
tsLogObj
.
logHandle
->
pFile
!=
NULL
&&
osLogSpaceAvailable
())
{
taosUpdateLogNums
(
level
);
taosUpdateLogNums
(
level
);
#if 0
// DEBUG_FATAL and DEBUG_ERROR are duplicated
// fsync will cause thread blocking and may also generate log misalignment in case of asyncLog
if (tsAsyncLog && level != DEBUG_FATAL) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
if (level == DEBUG_FATAL) {
taosFsyncFile(tsLogObj.logHandle->pFile);
}
}
#else
if
(
tsAsyncLog
)
{
if
(
tsAsyncLog
)
{
taosPushLogBuffer
(
tsLogObj
.
logHandle
,
buffer
,
len
);
taosPushLogBuffer
(
tsLogObj
.
logHandle
,
buffer
,
len
);
}
else
{
}
else
{
taosWriteFile
(
tsLogObj
.
logHandle
->
pFile
,
buffer
,
len
);
taosWriteFile
(
tsLogObj
.
logHandle
->
pFile
,
buffer
,
len
);
}
}
#endif
if
(
tsLogObj
.
maxLines
>
0
)
{
if
(
tsLogObj
.
maxLines
>
0
)
{
atomic_add_fetch_32
(
&
tsLogObj
.
lines
,
1
);
atomic_add_fetch_32
(
&
tsLogObj
.
lines
,
1
);
...
...
tests/parallel_test/cases.task
浏览文件 @
47ae92af
...
@@ -36,6 +36,10 @@
...
@@ -36,6 +36,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3
...
...
tests/parallel_test/run_case.sh
浏览文件 @
47ae92af
...
@@ -76,10 +76,10 @@ ulimit -c unlimited
...
@@ -76,10 +76,10 @@ ulimit -c unlimited
md5sum
/usr/lib/libtaos.so.1
md5sum
/usr/lib/libtaos.so.1
md5sum
/home/TDinternal/debug/build/lib/libtaos.so
md5sum
/home/TDinternal/debug/build/lib/libtaos.so
#define taospy 2.7.
6
#define taospy 2.7.
10
pip3 list|grep taospy
pip3 list|grep taospy
pip3 uninstall taospy
-y
pip3 uninstall taospy
-y
pip3
install
--default-timeout
=
120
taospy
==
2.7.
6
pip3
install
--default-timeout
=
120
taospy
==
2.7.
10
$TIMEOUT_CMD
$cmd
$TIMEOUT_CMD
$cmd
RET
=
$?
RET
=
$?
...
...
tests/system-test/2-query/diff.py
浏览文件 @
47ae92af
...
@@ -23,7 +23,7 @@ class TDTestCase:
...
@@ -23,7 +23,7 @@ class TDTestCase:
tdSql
.
execute
(
tdSql
.
execute
(
f
"create table
{
dbname
}
.ntb(ts timestamp,c1 int,c2 double,c3 float)"
)
f
"create table
{
dbname
}
.ntb(ts timestamp,c1 int,c2 double,c3 float)"
)
tdSql
.
execute
(
tdSql
.
execute
(
f
"insert into
{
dbname
}
.ntb values(now,1,1.0,10.5)(now+1s,10,-100.0,5.1)(now+10s
,-1,15.1,5.0)"
)
f
"insert into
{
dbname
}
.ntb values('2023-01-01 00:00:01',1,1.0,10.5)('2023-01-01 00:00:02',10,-100.0,5.1)('2023-01-01 00:00:03'
,-1,15.1,5.0)"
)
tdSql
.
query
(
f
"select diff(c1,0) from
{
dbname
}
.ntb"
)
tdSql
.
query
(
f
"select diff(c1,0) from
{
dbname
}
.ntb"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkRows
(
2
)
...
@@ -233,6 +233,40 @@ class TDTestCase:
...
@@ -233,6 +233,40 @@ class TDTestCase:
tdSql
.
checkRows
(
19
)
tdSql
.
checkRows
(
19
)
tdSql
.
checkData
(
0
,
0
,
None
)
tdSql
.
checkData
(
0
,
0
,
None
)
# TD-25098
tdSql
.
query
(
f
"select ts, diff(c1) from
{
dbname
}
.ntb order by ts"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
'2023-01-01 00:00:02.000'
)
tdSql
.
checkData
(
1
,
0
,
'2023-01-01 00:00:03.000'
)
tdSql
.
checkData
(
0
,
1
,
9
)
tdSql
.
checkData
(
1
,
1
,
-
11
)
tdSql
.
query
(
f
"select ts, diff(c1) from
{
dbname
}
.ntb order by ts desc"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
'2023-01-01 00:00:03.000'
)
tdSql
.
checkData
(
1
,
0
,
'2023-01-01 00:00:02.000'
)
tdSql
.
checkData
(
0
,
1
,
-
11
)
tdSql
.
checkData
(
1
,
1
,
9
)
tdSql
.
query
(
f
"select ts, diff(c1) from (select * from
{
dbname
}
.ntb order by ts)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
'2023-01-01 00:00:02.000'
)
tdSql
.
checkData
(
1
,
0
,
'2023-01-01 00:00:03.000'
)
tdSql
.
checkData
(
0
,
1
,
9
)
tdSql
.
checkData
(
1
,
1
,
-
11
)
tdSql
.
query
(
f
"select ts, diff(c1) from (select * from
{
dbname
}
.ntb order by ts desc)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
'2023-01-01 00:00:02.000'
)
tdSql
.
checkData
(
1
,
0
,
'2023-01-01 00:00:01.000'
)
tdSql
.
checkData
(
0
,
1
,
11
)
tdSql
.
checkData
(
1
,
1
,
-
9
)
def
stop
(
self
):
def
stop
(
self
):
tdSql
.
close
()
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
...
...
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
47ae92af
...
@@ -37,6 +37,9 @@ from util.common import *
...
@@ -37,6 +37,9 @@ from util.common import *
# INSERT_DATA = 3
# INSERT_DATA = 3
class
TMQCom
:
class
TMQCom
:
def
__init__
(
self
):
self
.
g_end_insert_flag
=
0
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
self
.
replicaVar
=
int
(
replicaVar
)
tdSql
.
init
(
conn
.
cursor
())
tdSql
.
init
(
conn
.
cursor
())
...
@@ -330,8 +333,11 @@ class TMQCom:
...
@@ -330,8 +333,11 @@ class TMQCom:
ctbDict
[
i
]
=
0
ctbDict
[
i
]
=
0
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfCtb
=
0
rowsOfCtb
=
0
while
rowsOfCtb
<
rowsPerTbl
:
while
rowsOfCtb
<
rowsPerTbl
:
if
(
0
!=
self
.
g_end_insert_flag
):
tdLog
.
debug
(
"get signal to stop insert data"
)
break
for
i
in
range
(
ctbNum
):
for
i
in
range
(
ctbNum
):
sql
+=
" %s.%s%d values "
%
(
dbName
,
ctbPrefix
,
i
+
ctbStartIdx
)
sql
+=
" %s.%s%d values "
%
(
dbName
,
ctbPrefix
,
i
+
ctbStartIdx
)
rowsBatched
=
0
rowsBatched
=
0
...
@@ -571,6 +577,20 @@ class TMQCom:
...
@@ -571,6 +577,20 @@ class TMQCom:
tdLog
.
info
(
tsql
.
queryResult
)
tdLog
.
info
(
tsql
.
queryResult
)
tdLog
.
info
(
"wait subscriptions exit for %d s"
%
wait_cnt
)
tdLog
.
info
(
"wait subscriptions exit for %d s"
%
wait_cnt
)
def
killProcesser
(
self
,
processerName
):
killCmd
=
(
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1"
%
processerName
)
psCmd
=
"ps -ef|grep -w %s| grep -v grep | awk '{print $2}'"
%
processerName
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
while
processID
:
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
)
def
close
(
self
):
def
close
(
self
):
self
.
cursor
.
close
()
self
.
cursor
.
close
()
...
...
tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py
0 → 100644
浏览文件 @
47ae92af
import
sys
import
time
import
datetime
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
1
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
100
self
.
tmqMaxTopicNum
=
1
self
.
tmqMaxGroups
=
1
self
.
walRetentionPeriod
=
3
self
.
actConsumeTotalRows
=
0
self
.
retryPoll
=
0
self
.
lock
=
threading
.
Lock
()
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
getPath
(
self
,
tool
=
"taosBenchmark"
):
if
(
platform
.
system
().
lower
()
==
'windows'
):
tool
=
tool
+
".exe"
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
paths
=
[]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
((
tool
)
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
paths
.
append
(
os
.
path
.
join
(
root
,
tool
))
break
if
(
len
(
paths
)
==
0
):
tdLog
.
exit
(
"taosBenchmark not found!"
)
return
else
:
tdLog
.
info
(
"taosBenchmark found in %s"
%
paths
[
0
])
return
paths
[
0
]
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
1
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period %d"
%
(
paraDict
[
'dbName'
],
self
.
walRetentionPeriod
))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tdLog.info("insert data")
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def
tmqSubscribe
(
self
,
**
inputDict
):
consumer_dict
=
{
"group.id"
:
inputDict
[
'group_id'
],
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"100"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
consumer
=
Consumer
(
consumer_dict
)
consumer
.
subscribe
([
inputDict
[
'topic_name'
]])
onceFlag
=
0
try
:
while
True
:
if
(
1
==
self
.
retryPoll
):
time
.
sleep
(
2
)
continue
res
=
consumer
.
poll
(
inputDict
[
'pollDelay'
])
if
not
res
:
break
err
=
res
.
error
()
if
err
is
not
None
:
raise
err
val
=
res
.
value
()
for
block
in
val
:
# print(block.fetchall())
data
=
block
.
fetchall
()
for
row
in
data
:
# print("===================================")
# print(row)
self
.
actConsumeTotalRows
+=
1
if
(
0
==
onceFlag
):
onceFlag
=
1
with
self
.
lock
:
self
.
retryPoll
=
1
currentTime
=
datetime
.
now
()
print
(
"%s temp stop consume"
%
(
str
(
currentTime
)))
currentTime
=
datetime
.
now
()
print
(
"%s already consume rows: %d, and sleep for a while"
%
(
str
(
currentTime
),
self
.
actConsumeTotalRows
))
# time.sleep(self.walRetentionPeriod * 3)
finally
:
consumer
.
unsubscribe
()
consumer
.
close
()
return
def
asyncSubscribe
(
self
,
inputDict
):
pThread
=
threading
.
Thread
(
target
=
self
.
tmqSubscribe
,
kwargs
=
inputDict
)
pThread
.
start
()
return
pThread
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
100
,
'batchNum'
:
1
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
# create topic
topicNameList
=
[
'dbtstb_0001'
]
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
len
(
topicNameList
)):
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
i
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# start consumer
inputDict
=
{
'group_id'
:
"grpid_0001"
,
'topic_name'
:
topicNameList
[
0
],
'pollDelay'
:
10
}
pThread2
=
self
.
asyncSubscribe
(
inputDict
)
pThread1
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
pThread1
.
join
()
tdLog
.
info
(
"firstly call to flash database"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
time
.
sleep
(
self
.
walRetentionPeriod
+
1
)
tdLog
.
info
(
"secondely call to flash database"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
# wait the consumer to complete one poll
while
(
0
==
self
.
retryPoll
):
time
.
sleep
(
1
)
continue
with
self
.
lock
:
self
.
retryPoll
=
0
currentTime
=
datetime
.
now
()
print
(
"%s restart consume"
%
(
str
(
currentTime
)))
paraDict
[
"startTs"
]
=
1640966400000
+
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
pThread3
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
tdLog
.
debug
(
"wait sub-thread to end insert data"
)
pThread3
.
join
()
totalInsertRows
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
*
2
tdLog
.
debug
(
"wait sub-thread to end consume data"
)
pThread2
.
join
()
tdLog
.
info
(
"act consume total rows: %d, act insert total rows: %d"
%
(
self
.
actConsumeTotalRows
,
totalInsertRows
))
if
(
self
.
actConsumeTotalRows
>=
totalInsertRows
):
tdLog
.
exit
(
"act consume rows: %d not equal expect: %d"
%
(
self
.
actConsumeTotalRows
,
totalInsertRows
))
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqDropConsumer.json
0 → 100644
浏览文件 @
47ae92af
{
"filetype"
:
"subscribe"
,
"cfgdir"
:
"/etc/taos"
,
"host"
:
"127.0.0.1"
,
"port"
:
6030
,
"user"
:
"root"
,
"password"
:
"taosdata"
,
"result_file"
:
"tmq_res.txt"
,
"tmq_info"
:
{
"concurrent"
:
2
,
"poll_delay"
:
100000
,
"group.id"
:
""
,
"group_mode"
:
"independent"
,
"create_mode"
:
"parallel"
,
"client.id"
:
"cliid_0001"
,
"auto.offset.reset"
:
"earliest"
,
"enable.manual.commit"
:
"false"
,
"enable.auto.commit"
:
"false"
,
"auto.commit.interval.ms"
:
1000
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
,
"rows_file"
:
""
,
"topic_list"
:
[
{
"name"
:
"dbtstb_0001"
,
"sql"
:
"select * from dbt.stb;"
},
{
"name"
:
"dbtstb_0002"
,
"sql"
:
"select * from dbt.stb;"
}
]
}
}
tests/system-test/7-tmq/tmqDropConsumer.py
0 → 100644
浏览文件 @
47ae92af
import
sys
import
time
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
2
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
10
self
.
tmqMaxTopicNum
=
2
self
.
tmqMaxGroups
=
2
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
getPath
(
self
,
tool
=
"taosBenchmark"
):
if
(
platform
.
system
().
lower
()
==
'windows'
):
tool
=
tool
+
".exe"
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
paths
=
[]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
((
tool
)
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
paths
.
append
(
os
.
path
.
join
(
root
,
tool
))
break
if
(
len
(
paths
)
==
0
):
tdLog
.
exit
(
"taosBenchmark not found!"
)
return
else
:
tdLog
.
info
(
"taosBenchmark found in %s"
%
paths
[
0
])
return
paths
[
0
]
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
2
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period 360000"
%
(
paraDict
[
'dbName'
]))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqSubscribe
(
self
,
topicName
,
newGroupId
,
expectResult
):
# create new connector for new tdSql instance in my thread
# newTdSql = tdCom.newTdSql()
# topicName = inputDict['topic_name']
# group_id = inputDict['group_id']
consumer_dict
=
{
"group.id"
:
newGroupId
,
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"1000"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
ret
=
'success'
consumer
=
Consumer
(
consumer_dict
)
# print("======%s"%(inputDict['topic_name']))
try
:
consumer
.
subscribe
([
topicName
])
except
Exception
as
e
:
tdLog
.
info
(
"consumer.subscribe() fail "
)
tdLog
.
info
(
"%s"
%
(
e
))
if
(
expectResult
==
"fail"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
tdLog
.
info
(
"consumer.subscribe() success "
)
if
(
expectResult
==
"success"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
100000000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'dbtstb_0001'
,
'dbtstb_0002'
]
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
len
(
topicNameList
)):
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
i
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query('show topics;')
# topicNum = tdSql.queryRows
# tdLog.info(" topic count: %d"%(topicNum))
# if topicNum != len(topicNameList):
# tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, len(topicNameList)))
pThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
# use taosBenchmark to subscribe
binPath
=
self
.
getPath
()
cmd
=
"nohup %s -f ./7-tmq/tmqDropConsumer.json > /dev/null 2>&1 & "
%
binPath
tdLog
.
info
(
"%s"
%
(
cmd
))
os
.
system
(
cmd
)
expectTopicNum
=
len
(
topicNameList
)
consumerThreadNum
=
2
expectConsumerNUm
=
expectTopicNum
*
consumerThreadNum
expectSubscribeNum
=
self
.
vgroups
*
expectTopicNum
*
consumerThreadNum
tdSql
.
query
(
'show topics;'
)
topicNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get topic count: %d"
%
(
topicNum
))
if
topicNum
!=
expectTopicNum
:
tdLog
.
exit
(
"show topics %d not equal expect num: %d"
%
(
topicNum
,
expectTopicNum
))
flag
=
0
while
(
1
):
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
tdLog
.
info
(
" get consumers count: %d"
%
(
consumerNUm
))
if
consumerNUm
==
expectConsumerNUm
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
exit
(
"show consumers %d not equal expect num: %d"
%
(
topicNum
,
expectConsumerNUm
))
flag
=
0
for
i
in
range
(
10
):
tdSql
.
query
(
'show subscriptions;'
)
subscribeNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get subscriptions count: %d"
%
(
subscribeNum
))
if
subscribeNum
==
expectSubscribeNum
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
exit
(
"show subscriptions %d not equal expect num: %d"
%
(
subscribeNum
,
expectSubscribeNum
))
# get all consumer group id
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
groupIdList
=
[]
for
i
in
range
(
consumerNUm
):
groupId
=
tdSql
.
getData
(
i
,
1
)
existFlag
=
0
for
j
in
range
(
len
(
groupIdList
)):
if
(
groupId
==
groupIdList
[
j
]):
existFlag
=
1
break
if
(
0
==
existFlag
):
groupIdList
.
append
(
groupId
)
# kill taosBenchmark
tmqCom
.
killProcesser
(
"taosBenchmark"
)
tdLog
.
info
(
"kill taosBenchmak end"
)
# wait the status to "lost"
while
(
1
):
exitFlag
=
1
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
for
i
in
range
(
consumerNUm
):
status
=
tdSql
.
getData
(
i
,
3
)
if
(
status
!=
"lost"
):
exitFlag
=
0
time
.
sleep
(
2
)
break
if
(
1
==
exitFlag
):
break
tdLog
.
info
(
"all consumers status into 'lost'"
)
# drop consumer groups
for
i
in
range
(
len
(
groupIdList
)):
for
j
in
range
(
len
(
topicNameList
)):
sqlCmd
=
f
"drop consumer group `%s` on %s"
%
(
groupIdList
[
i
],
topicNameList
[
j
])
tdLog
.
info
(
"drop consumer cmd: %s"
%
(
sqlCmd
))
tdSql
.
execute
(
sqlCmd
)
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
debug
(
"notify sub-thread to stop insert data"
)
pThread
.
join
()
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqMaxGroupIds.json
0 → 100644
浏览文件 @
47ae92af
{
"filetype"
:
"subscribe"
,
"cfgdir"
:
"/etc/taos"
,
"host"
:
"127.0.0.1"
,
"port"
:
6030
,
"user"
:
"root"
,
"password"
:
"taosdata"
,
"result_file"
:
"tmq_res.txt"
,
"tmq_info"
:
{
"concurrent"
:
99
,
"poll_delay"
:
100000
,
"group.id"
:
""
,
"group_mode"
:
"independent"
,
"create_mode"
:
"parallel"
,
"client.id"
:
"cliid_0001"
,
"auto.offset.reset"
:
"earliest"
,
"enable.manual.commit"
:
"false"
,
"enable.auto.commit"
:
"false"
,
"auto.commit.interval.ms"
:
1000
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
,
"rows_file"
:
""
,
"topic_list"
:
[
{
"name"
:
"dbtstb_0001"
,
"sql"
:
"select * from dbt.stb;"
}
]
}
}
tests/system-test/7-tmq/tmqMaxGroupIds.py
0 → 100644
浏览文件 @
47ae92af
import
sys
import
time
import
threading
from
taos.tmq
import
Consumer
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
1
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
10
self
.
tmqMaxTopicNum
=
20
self
.
tmqMaxGroups
=
100
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
getPath
(
self
,
tool
=
"taosBenchmark"
):
if
(
platform
.
system
().
lower
()
==
'windows'
):
tool
=
tool
+
".exe"
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
paths
=
[]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
((
tool
)
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
paths
.
append
(
os
.
path
.
join
(
root
,
tool
))
break
if
(
len
(
paths
)
==
0
):
tdLog
.
exit
(
"taosBenchmark not found!"
)
return
else
:
tdLog
.
info
(
"taosBenchmark found in %s"
%
paths
[
0
])
return
paths
[
0
]
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdSql
.
execute
(
"alter database %s wal_retention_period 360000"
%
(
paraDict
[
'dbName'
]))
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqSubscribe
(
self
,
topicName
,
newGroupId
,
expectResult
):
# create new connector for new tdSql instance in my thread
# newTdSql = tdCom.newTdSql()
# topicName = inputDict['topic_name']
# group_id = inputDict['group_id']
consumer_dict
=
{
"group.id"
:
newGroupId
,
"client.id"
:
"client"
,
"td.connect.user"
:
"root"
,
"td.connect.pass"
:
"taosdata"
,
"auto.commit.interval.ms"
:
"1000"
,
"enable.auto.commit"
:
"true"
,
"auto.offset.reset"
:
"earliest"
,
"experimental.snapshot.enable"
:
"false"
,
"msg.with.table.name"
:
"false"
}
ret
=
'success'
consumer
=
Consumer
(
consumer_dict
)
# print("======%s"%(inputDict['topic_name']))
try
:
consumer
.
subscribe
([
topicName
])
except
Exception
as
e
:
tdLog
.
info
(
"consumer.subscribe() fail "
)
tdLog
.
info
(
"%s"
%
(
e
))
if
(
expectResult
==
"fail"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
tdLog
.
info
(
"consumer.subscribe() success "
)
if
(
expectResult
==
"success"
):
consumer
.
close
()
return
'success'
else
:
consumer
.
close
()
return
'fail'
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
100000000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'dbtstb_0001'
]
tdLog
.
info
(
"create topics from stb"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
for
i
in
range
(
len
(
topicNameList
)):
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
i
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query('show topics;')
# topicNum = tdSql.queryRows
# tdLog.info(" topic count: %d"%(topicNum))
# if topicNum != len(topicNameList):
# tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, len(topicNameList)))
pThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
# use taosBenchmark to subscribe
binPath
=
self
.
getPath
()
cmd
=
"nohup %s -f ./7-tmq/tmqMaxGroupIds.json > /dev/null 2>&1 & "
%
binPath
tdLog
.
info
(
"%s"
%
(
cmd
))
os
.
system
(
cmd
)
expectTopicNum
=
1
expectConsumerNUm
=
99
expectSubscribeNum
=
99
tdSql
.
query
(
'show topics;'
)
topicNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get topic count: %d"
%
(
topicNum
))
if
topicNum
!=
expectTopicNum
:
tdLog
.
exit
(
"show topics %d not equal expect num: %d"
%
(
topicNum
,
expectTopicNum
))
flag
=
0
while
(
1
):
tdSql
.
query
(
'show consumers;'
)
consumerNUm
=
tdSql
.
queryRows
tdLog
.
info
(
" get consumers count: %d"
%
(
consumerNUm
))
if
consumerNUm
==
expectConsumerNUm
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tdLog
.
exit
(
"show consumers %d not equal expect num: %d"
%
(
topicNum
,
expectConsumerNUm
))
flag
=
0
for
i
in
range
(
10
):
tdSql
.
query
(
'show subscriptions;'
)
subscribeNum
=
tdSql
.
queryRows
tdLog
.
info
(
" get subscriptions count: %d"
%
(
subscribeNum
))
if
subscribeNum
==
expectSubscribeNum
:
flag
=
1
break
else
:
time
.
sleep
(
1
)
if
(
0
==
flag
):
tdLog
.
exit
(
"show subscriptions %d not equal expect num: %d"
%
(
subscribeNum
,
expectSubscribeNum
))
res
=
self
.
tmqSubscribe
(
topicNameList
[
0
],
"newGroupId_001"
,
"success"
)
if
res
!=
'success'
:
tdLog
.
exit
(
"limit max groupid fail"
)
res
=
self
.
tmqSubscribe
(
topicNameList
[
0
],
"newGroupId_002"
,
"fail"
)
if
res
!=
'success'
:
tdLog
.
exit
(
"limit max groupid fail"
)
tmqCom
.
g_end_insert_flag
=
1
tdLog
.
debug
(
"notify sub-thread to stop insert data"
)
pThread
.
join
()
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tools/shell/src/shellEngine.c
浏览文件 @
47ae92af
...
@@ -361,11 +361,11 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
...
@@ -361,11 +361,11 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_FLOAT
:
width
=
SHELL_FLOAT_WIDTH
;
width
=
SHELL_FLOAT_WIDTH
;
if
(
tsEnableScience
)
{
if
(
tsEnableScience
)
{
taosFprintfFile
(
pFile
,
"%*e"
,
width
,
GET_FLOAT_VAL
(
val
));
taosFprintfFile
(
pFile
,
"%*
.7
e"
,
width
,
GET_FLOAT_VAL
(
val
));
}
else
{
}
else
{
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
5
f"
,
width
,
GET_FLOAT_VAL
(
val
));
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
7
f"
,
width
,
GET_FLOAT_VAL
(
val
));
if
(
n
>
SHELL_FLOAT_WIDTH
)
{
if
(
n
>
SHELL_FLOAT_WIDTH
)
{
taosFprintfFile
(
pFile
,
"%*e"
,
width
,
GET_FLOAT_VAL
(
val
));
taosFprintfFile
(
pFile
,
"%*
.7
e"
,
width
,
GET_FLOAT_VAL
(
val
));
}
else
{
}
else
{
taosFprintfFile
(
pFile
,
"%s"
,
buf
);
taosFprintfFile
(
pFile
,
"%s"
,
buf
);
}
}
...
@@ -374,10 +374,10 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
...
@@ -374,10 +374,10 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
case
TSDB_DATA_TYPE_DOUBLE
:
case
TSDB_DATA_TYPE_DOUBLE
:
width
=
SHELL_DOUBLE_WIDTH
;
width
=
SHELL_DOUBLE_WIDTH
;
if
(
tsEnableScience
)
{
if
(
tsEnableScience
)
{
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%
.9e"
,
GET_DOUBLE_VAL
(
val
));
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%
*.15e"
,
width
,
GET_DOUBLE_VAL
(
val
));
taosFprintfFile
(
pFile
,
"%
*s"
,
width
,
buf
);
taosFprintfFile
(
pFile
,
"%
s"
,
buf
);
}
else
{
}
else
{
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
9
f"
,
width
,
GET_DOUBLE_VAL
(
val
));
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
15
f"
,
width
,
GET_DOUBLE_VAL
(
val
));
if
(
n
>
SHELL_DOUBLE_WIDTH
)
{
if
(
n
>
SHELL_DOUBLE_WIDTH
)
{
taosFprintfFile
(
pFile
,
"%*.15e"
,
width
,
GET_DOUBLE_VAL
(
val
));
taosFprintfFile
(
pFile
,
"%*.15e"
,
width
,
GET_DOUBLE_VAL
(
val
));
}
else
{
}
else
{
...
@@ -612,11 +612,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
...
@@ -612,11 +612,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
break
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_FLOAT
:
if
(
tsEnableScience
)
{
if
(
tsEnableScience
)
{
printf
(
"%*
e"
,
width
,
GET_FLOAT_VAL
(
val
));
printf
(
"%*
.7e"
,
width
,
GET_FLOAT_VAL
(
val
));
}
else
{
}
else
{
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
5
f"
,
width
,
GET_FLOAT_VAL
(
val
));
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
7
f"
,
width
,
GET_FLOAT_VAL
(
val
));
if
(
n
>
SHELL_FLOAT_WIDTH
)
{
if
(
n
>
SHELL_FLOAT_WIDTH
)
{
printf
(
"%*e"
,
width
,
GET_FLOAT_VAL
(
val
));
printf
(
"%*.7e"
,
width
,
GET_FLOAT_VAL
(
val
));
}
else
{
}
else
{
printf
(
"%s"
,
buf
);
printf
(
"%s"
,
buf
);
}
}
...
@@ -624,14 +625,14 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
...
@@ -624,14 +625,14 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
break
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
case
TSDB_DATA_TYPE_DOUBLE
:
if
(
tsEnableScience
)
{
if
(
tsEnableScience
)
{
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%
.9e"
,
GET_DOUBLE_VAL
(
val
));
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%
*.15e"
,
width
,
GET_DOUBLE_VAL
(
val
));
printf
(
"%
*s"
,
width
,
buf
);
printf
(
"%
s"
,
buf
);
}
else
{
}
else
{
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
9
f"
,
width
,
GET_DOUBLE_VAL
(
val
));
n
=
snprintf
(
buf
,
TSDB_MAX_BYTES_PER_ROW
,
"%*.
15
f"
,
width
,
GET_DOUBLE_VAL
(
val
));
if
(
n
>
SHELL_DOUBLE_WIDTH
)
{
if
(
n
>
SHELL_DOUBLE_WIDTH
)
{
printf
(
"%*.15e"
,
width
,
GET_DOUBLE_VAL
(
val
));
printf
(
"%*.15e"
,
width
,
GET_DOUBLE_VAL
(
val
));
}
else
{
}
else
{
printf
(
"%
s"
,
buf
);
printf
(
"%
*s"
,
width
,
buf
);
}
}
}
}
break
;
break
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录