Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f2468013
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f2468013
编写于
7月 01, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/dnode
上级
7f6778c5
2c64b170
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
142 addition
and
26 deletion
+142
-26
include/libs/wal/wal.h
include/libs/wal/wal.h
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+2
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-5
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+4
-0
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+3
-3
tests/system-test/7-tmq/tmqConsFromTsdb.py
tests/system-test/7-tmq/tmqConsFromTsdb.py
+107
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+23
-16
未找到文件。
include/libs/wal/wal.h
浏览文件 @
f2468013
...
...
@@ -210,6 +210,8 @@ void walCloseRef(SWalRef *);
int32_t
walRefVer
(
SWalRef
*
,
int64_t
ver
);
int32_t
walUnrefVer
(
SWal
*
);
bool
walLogExist
(
SWal
*
,
int64_t
ver
);
// lifecycle check
bool
walIsEmpty
(
SWal
*
);
int64_t
walGetFirstVer
(
SWal
*
);
...
...
source/client/src/tmq.c
浏览文件 @
f2468013
...
...
@@ -1609,8 +1609,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
char
offsetFormatBuf
[
5
0
];
tFormatOffset
(
offsetFormatBuf
,
5
0
,
&
pVg
->
currentOffsetNew
);
char
offsetFormatBuf
[
8
0
];
tFormatOffset
(
offsetFormatBuf
,
8
0
,
&
pVg
->
currentOffsetNew
);
tscDebug
(
"consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
offsetFormatBuf
,
pReq
->
reqId
);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f2468013
...
...
@@ -276,7 +276,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqDebug
(
"tmq poll: consumer %ld, offset reset to %s"
,
consumerId
,
formatBuf
);
}
else
{
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEAST
)
{
if
(
pReq
->
useSnapshot
)
{
if
(
pReq
->
useSnapshot
&&
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
!
pHandle
->
fetchMeta
)
{
tqOffsetResetToData
(
&
fetchOffsetNew
,
0
,
0
);
}
else
{
...
...
@@ -375,10 +375,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosMemoryFree
(
pHeadWithCkSum
);
}
else
if
(
fetchOffsetNew
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
// 1. set uid and ts
// 2. get data (rebuild reader if needed)
// 3. get new uid and ts
tqInfo
(
"retrieve using snapshot req offset: uid %ld ts %ld"
,
dataRsp
.
reqOffset
.
uid
,
dataRsp
.
reqOffset
.
ts
);
if
(
tqScanSnapshot
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
fetchOffsetNew
,
workerId
)
<
0
)
{
ASSERT
(
0
);
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
f2468013
...
...
@@ -19,6 +19,10 @@
#include "tref.h"
#include "walInt.h"
bool
FORCE_INLINE
walLogExist
(
SWal
*
pWal
,
int64_t
ver
)
{
return
!
walIsEmpty
(
pWal
)
&&
walGetFirstVer
(
pWal
)
<=
ver
&&
walGetLastVer
(
pWal
)
>=
ver
;
}
bool
FORCE_INLINE
walIsEmpty
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
firstVer
==
-
1
;
}
int64_t
FORCE_INLINE
walGetFirstVer
(
SWal
*
pWal
)
{
return
pWal
->
vers
.
firstVer
;
}
...
...
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
f2468013
...
...
@@ -93,7 +93,7 @@ class TMQCom:
return
resultList
def
startTmqSimProcess
(
self
,
pollDelay
,
dbName
,
showMsg
=
1
,
showRow
=
1
,
cdbName
=
'cdb'
,
valgrind
=
0
,
alias
=
0
):
def
startTmqSimProcess
(
self
,
pollDelay
,
dbName
,
showMsg
=
1
,
showRow
=
1
,
cdbName
=
'cdb'
,
valgrind
=
0
,
alias
=
0
,
snapshot
=
0
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
if
valgrind
==
1
:
...
...
@@ -109,7 +109,7 @@ class TMQCom:
os
.
system
(
shellCmd
)
processorName
=
processorNameNew
shellCmd
=
'mintty -h never '
+
processorName
+
' -c '
+
cfgPath
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s
"
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s
-e %d "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
,
snapshot
)
shellCmd
+=
"> nul 2>&1 &"
else
:
processorName
=
buildPath
+
'/build/bin/tmq_sim'
...
...
@@ -119,7 +119,7 @@ class TMQCom:
os
.
system
(
shellCmd
)
processorName
=
processorNameNew
shellCmd
=
'nohup '
+
processorName
+
' -c '
+
cfgPath
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s
"
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s
-e %d "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
,
snapshot
)
shellCmd
+=
"> /dev/null 2>&1 &"
tdLog
.
info
(
shellCmd
)
os
.
system
(
shellCmd
)
...
...
tests/system-test/7-tmq/tmqConsFromTsdb.py
0 → 100644
浏览文件 @
f2468013
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db1'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
4
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb"
)
tdCom
.
create_ctable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
tag_elm_list
=
paraDict
[
'tagSchema'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
'ctbPrefix'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])
tdDnodes
.
stop
(
1
)
time
.
sleep
(
2
)
tdDnodes
.
start
(
1
)
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as stable %s"
%
(
topicNameList
[
0
],
paraDict
[
'stbName'
])
# sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
tmqCom
.
checkFileContent
(
consumerId
,
queryString
)
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/test/c/tmqSim.c
浏览文件 @
f2468013
...
...
@@ -36,7 +36,11 @@
#define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32)
typedef
enum
{
NOTIFY_CMD_START_CONSUM
,
NOTIFY_CMD_START_COMMIT
,
NOTIFY_CMD_ID_BUTT
}
NOTIFY_CMD_ID
;
typedef
enum
{
NOTIFY_CMD_START_CONSUM
,
NOTIFY_CMD_START_COMMIT
,
NOTIFY_CMD_ID_BUTT
,
}
NOTIFY_CMD_ID
;
typedef
struct
{
TdThread
thread
;
...
...
@@ -633,8 +637,9 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
int64_t
lastTotalMsgs
=
0
;
uint64_t
lastPrintTime
=
taosGetTimestampMs
();
uint64_t
startTs
=
taosGetTimestampMs
();
int32_t
consumeDelay
=
g_stConfInfo
.
consumeDelay
==
-
1
?
-
1
:
(
g_stConfInfo
.
consumeDelay
*
1000
);
while
(
running
)
{
...
...
@@ -647,20 +652,22 @@ void loop_consume(SThreadInfo* pInfo) {
taos_free_result
(
tmqMsg
);
totalMsgs
++
;
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
totalMsgs
);
lastPrintTime
=
currentPrintTime
;
}
int64_t
currentPrintTime
=
taosGetTimestampMs
();
if
(
currentPrintTime
-
lastPrintTime
>
10
*
1000
)
{
taosFprintfFile
(
g_fp
,
"consumer id %d has currently poll total msgs: %"
PRId64
", period rate: %.3f msgs/second
\n
"
,
pInfo
->
consumerId
,
totalMsgs
,
(
totalMsgs
-
lastTotalMsgs
)
*
1000
.
0
/
(
currentPrintTime
-
lastPrintTime
));
lastPrintTime
=
currentPrintTime
;
lastTotalMsgs
=
totalMsgs
;
}
if
(
0
==
once_flag
)
{
once_flag
=
1
;
notifyMainScript
(
pInfo
,
NOTIFY_CMD_START_CONSUM
);
}
if
(
totalRows
>=
pInfo
->
expectMsgCnt
)
{
if
(
(
totalRows
>=
pInfo
->
expectMsgCnt
)
||
(
totalMsgs
>=
pInfo
->
expectMsgCnt
)
)
{
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s over than expect rows, so break consume
\n
"
,
getCurrentTimeString
(
tmpString
));
break
;
...
...
@@ -671,7 +678,7 @@ void loop_consume(SThreadInfo* pInfo) {
break
;
}
}
if
(
0
==
running
)
{
taosFprintfFile
(
g_fp
,
"receive stop signal and not continue consume
\n
"
);
}
...
...
@@ -881,11 +888,11 @@ int main(int32_t argc, char* argv[]) {
int64_t
t
=
end
-
start
;
if
(
0
==
t
)
t
=
1
;
double
tInMs
=
(
double
)
t
/
1000000
.
0
;
taosFprintfFile
(
g_fp
,
"Spent %.4f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.2
f msgs/second
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
));
"Spent %.3f seconds to poll msgs: %"
PRIu64
" with %d thread(s), throughput: %.3
f msgs/second
\n\n
"
,
tInMs
,
totalMsgs
,
g_stConfInfo
.
numOfThread
,
(
double
)(
totalMsgs
/
tInMs
));
taosFprintfFile
(
g_fp
,
"==== close tmqlog ====
\n
"
);
taosCloseFile
(
&
g_fp
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录