Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b899a8b7
TDengine
项目概览
taosdata
/
TDengine
9 个月 前同步成功
通知
1176
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
b899a8b7
编写于
8月 22, 2023
作者:
D
dm chen
提交者:
GitHub
8月 22, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feat/TD-22970
上级
dda51d80
1a0ebb7e
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
444 addition
and
91 deletion
+444
-91
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+3
-1
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+1
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+31
-6
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+1
-1
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+13
-14
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+4
-5
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+17
-5
source/dnode/mnode/sdb/inc/sdb.h
source/dnode/mnode/sdb/inc/sdb.h
+3
-0
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+1
-0
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+2
-0
source/dnode/vnode/src/tsdb/tsdbFS2.c
source/dnode/vnode/src/tsdb/tsdbFS2.c
+202
-8
source/dnode/vnode/src/tsdb/tsdbFile2.c
source/dnode/vnode/src/tsdb/tsdbFile2.c
+1
-1
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/parallel_test/split_case.sh
tests/parallel_test/split_case.sh
+2
-0
tests/pytest/util/dnodes.py
tests/pytest/util/dnodes.py
+9
-0
tests/script/win-test-file
tests/script/win-test-file
+4
-45
tests/system-test/1-insert/precisionNS.py
tests/system-test/1-insert/precisionNS.py
+1
-1
tests/system-test/7-tmq/tmqClientConsLog.py
tests/system-test/7-tmq/tmqClientConsLog.py
+3
-2
tests/system-test/7-tmq/tmqDataPrecisionUnit.py
tests/system-test/7-tmq/tmqDataPrecisionUnit.py
+139
-0
tests/system-test/test.py
tests/system-test/test.py
+2
-1
tests/system-test/win-test-file
tests/system-test/win-test-file
+4
-0
未找到文件。
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
b899a8b7
...
...
@@ -66,11 +66,13 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrnConflct
conflict
,
const
SRpcMsg
*
pReq
,
const
char
*
opername
);
void
mndTransDrop
(
STrans
*
pTrans
);
int32_t
mndTransAppendPrepareLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendNullLog
(
STrans
*
pTrans
);
int32_t
mndTransAppendPrepareAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
);
void
mndTransSetRpcRsp
(
STrans
*
pTrans
,
void
*
pCont
,
int32_t
contLen
);
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
b899a8b7
...
...
@@ -37,7 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray
*
mndBuildDnodesArray
(
SMnode
*
,
int32_t
exceptDnodeId
);
int32_t
mndAllocSmaVgroup
(
SMnode
*
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
int32_t
mndAllocVgroup
(
SMnode
*
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
int32_t
mndAdd
PrepareNewVg
Action
(
SMnode
*
,
STrans
*
pTrans
,
SVgObj
*
pVg
);
int32_t
mndAdd
NewVgPrepare
Action
(
SMnode
*
,
STrans
*
pTrans
,
SVgObj
*
pVg
);
int32_t
mndAddCreateVnodeAction
(
SMnode
*
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVgid
);
int32_t
mndAddAlterVnodeConfirmAction
(
SMnode
*
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
int32_t
mndAddAlterVnodeAction
(
SMnode
*
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
tmsg_t
msgType
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
b899a8b7
...
...
@@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static
int32_t
mndDbActionInsert
(
SSdb
*
pSdb
,
SDbObj
*
pDb
);
static
int32_t
mndDbActionDelete
(
SSdb
*
pSdb
,
SDbObj
*
pDb
);
static
int32_t
mndDbActionUpdate
(
SSdb
*
pSdb
,
SDbObj
*
pOld
,
SDbObj
*
pNew
);
static
int32_t
mndNewDbActionValidate
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
void
*
pObj
);
static
int32_t
mndProcessCreateDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessAlterDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropDbReq
(
SRpcMsg
*
pReq
);
...
...
@@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) {
.
insertFp
=
(
SdbInsertFp
)
mndDbActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndDbActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndDbActionDelete
,
.
validateFp
=
(
SdbValidateFp
)
mndNewDbActionValidate
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_DB
,
mndProcessCreateDbReq
);
...
...
@@ -247,6 +250,19 @@ _OVER:
return
pRow
;
}
static
int32_t
mndNewDbActionValidate
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
void
*
pObj
)
{
SDbObj
*
pNewDb
=
pObj
;
SDbObj
*
pOldDb
=
sdbAcquire
(
pMnode
->
pSdb
,
SDB_DB
,
pNewDb
->
name
);
if
(
pOldDb
!=
NULL
)
{
mError
(
"trans:%d, db name already in use. name: %s"
,
pTrans
->
id
,
pNewDb
->
name
);
sdbRelease
(
pMnode
->
pSdb
,
pOldDb
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndDbActionInsert
(
SSdb
*
pSdb
,
SDbObj
*
pDb
)
{
mTrace
(
"db:%s, perform insert action, row:%p"
,
pDb
->
name
,
pDb
);
return
0
;
...
...
@@ -448,9 +464,18 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if
(
pCfg
->
tsdbPageSize
<=
0
)
pCfg
->
tsdbPageSize
=
TSDB_DEFAULT_TSDB_PAGESIZE
;
}
static
int32_t
mndSetPrepareNewVgActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
static
int32_t
mndSetCreateDbPrepareAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
SSdbRaw
*
pDbRaw
=
mndDbActionEncode
(
pDb
);
if
(
pDbRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendPrepareLog
(
pTrans
,
pDbRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pDbRaw
,
SDB_STATUS_CREATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetNewVgPrepareActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
for
(
int32_t
v
=
0
;
v
<
pDb
->
cfg
.
numOfVgroups
;
++
v
)
{
if
(
mndAdd
PrepareNewVg
Action
(
pMnode
,
pTrans
,
(
pVgroups
+
v
))
!=
0
)
return
-
1
;
if
(
mndAdd
NewVgPrepare
Action
(
pMnode
,
pTrans
,
(
pVgroups
+
v
))
!=
0
)
return
-
1
;
}
return
0
;
}
...
...
@@ -459,7 +484,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
SSdbRaw
*
pDbRaw
=
mndDbActionEncode
(
pDb
);
if
(
pDbRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pDbRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pDbRaw
,
SDB_STATUS_
CREATING
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pDbRaw
,
SDB_STATUS_
UPDATE
)
!=
0
)
return
-
1
;
for
(
int32_t
v
=
0
;
v
<
pDb
->
cfg
.
numOfVgroups
;
++
v
)
{
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
pVgroups
+
v
);
...
...
@@ -633,11 +658,11 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
if
(
mndTransCheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
mndTransSetOper
(
pTrans
,
MND_OPER_CREATE_DB
);
if
(
mndSetPrepareNewVgActions
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbRedoActions
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbRedoLogs
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbPrepareAction
(
pMnode
,
pTrans
,
&
dbObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetNewVgPrepareActions
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbUndoLogs
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbCommitLogs
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
,
pNewUserDuped
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbRedoActions
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateDbUndoActions
(
pMnode
,
pTrans
,
&
dbObj
,
pVgroups
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
b899a8b7
...
...
@@ -631,7 +631,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mndTransSetSerial
(
pTrans
);
mInfo
(
"trans:%d, used to create sma:%s stream:%s"
,
pTrans
->
id
,
pCreate
->
name
,
streamObj
.
name
);
if
(
mndAdd
PrepareNewVg
Action
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndAdd
NewVgPrepare
Action
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaRedoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoLogs
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaCommitLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
b899a8b7
...
...
@@ -17,7 +17,6 @@
#include "mndSync.h"
#include "mndCluster.h"
#include "mndTrans.h"
#include "mndVgroup.h"
static
int32_t
mndSyncEqCtrlMsg
(
const
SMsgCb
*
msgcb
,
SRpcMsg
*
pMsg
)
{
if
(
pMsg
==
NULL
||
pMsg
->
pCont
==
NULL
)
{
...
...
@@ -75,25 +74,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
}
static
int32_t
mndTransValidatePrepareAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
STransAction
*
pAction
)
{
SSdbRaw
*
pRaw
=
pAction
->
pRaw
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdbRow
*
pRow
=
NULL
;
int32_t
code
=
-
1
;
void
*
pObj
=
NULL
;
int
code
=
-
1
;
if
(
pAction
->
msgType
==
TDMT_MND_CREATE_VG
)
{
pRow
=
mndVgroupActionDecode
(
pAction
->
pRaw
);
if
(
pRow
==
NULL
)
goto
_OUT
;
if
(
pRaw
->
status
!=
SDB_STATUS_CREATING
)
goto
_OUT
;
SVgObj
*
pVgroup
=
sdbGetRowObj
(
pRow
);
if
(
pVgroup
==
NULL
)
goto
_OUT
;
pRow
=
(
pSdb
->
decodeFps
[
pRaw
->
type
])(
pRaw
);
if
(
pRow
==
NULL
)
goto
_OUT
;
pObj
=
sdbGetRowObj
(
pRow
);
if
(
pObj
==
NULL
)
goto
_OUT
;
int32_t
maxVgId
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_VGROUP
);
if
(
maxVgId
>
pVgroup
->
vgId
)
{
mError
(
"trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d"
,
pTrans
->
id
,
pVgroup
->
vgId
,
maxVgId
);
goto
_OUT
;
}
SdbValidateFp
validateFp
=
pSdb
->
validateFps
[
pRaw
->
type
];
code
=
0
;
if
(
validateFp
)
{
code
=
validateFp
(
pMnode
,
pTrans
,
pObj
);
}
code
=
0
;
_OUT:
taosMemoryFreeClear
(
pRow
);
return
code
;
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
b899a8b7
...
...
@@ -655,11 +655,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
return
mndTransAppendAction
(
pTrans
->
commitActions
,
&
action
);
}
int32_t
mndTransAppendPrepareAction
(
STrans
*
pTrans
,
STransAction
*
pAction
)
{
pAction
->
stage
=
TRN_STAGE_PREPARE
;
pAction
->
actionType
=
TRANS_ACTION_RAW
;
pAction
->
mTraceId
=
pTrans
->
mTraceId
;
return
mndTransAppendAction
(
pTrans
->
prepareActions
,
pAction
);
int32_t
mndTransAppendPrepareLog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
STransAction
action
=
{
.
pRaw
=
pRaw
,
.
stage
=
TRN_STAGE_PREPARE
,
.
actionType
=
TRANS_ACTION_RAW
,
.
mTraceId
=
pTrans
->
mTraceId
};
return
mndTransAppendAction
(
pTrans
->
prepareActions
,
&
action
);
}
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
STransAction
*
pAction
)
{
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
b899a8b7
...
...
@@ -33,6 +33,7 @@
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOld
,
SVgObj
*
pNew
);
static
int32_t
mndNewVgActionValidate
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
void
*
pObj
);
static
int32_t
mndRetrieveVgroups
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextVgroup
(
SMnode
*
pMnode
,
void
*
pIter
);
...
...
@@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.
insertFp
=
(
SdbInsertFp
)
mndVgroupActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndVgroupActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndVgroupActionDelete
,
.
validateFp
=
(
SdbValidateFp
)
mndNewVgActionValidate
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_VNODE_RSP
,
mndTransProcessRsp
);
...
...
@@ -178,6 +180,17 @@ _OVER:
return
pRow
;
}
static
int32_t
mndNewVgActionValidate
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
void
*
pObj
)
{
SVgObj
*
pVgroup
=
pObj
;
int32_t
maxVgId
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_VGROUP
);
if
(
maxVgId
>
pVgroup
->
vgId
)
{
mError
(
"trans:%d, vgroup id %d already in use. maxVgId:%d"
,
pTrans
->
id
,
pVgroup
->
vgId
,
maxVgId
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
)
{
mTrace
(
"vgId:%d, perform insert action, row:%p"
,
pVgroup
->
vgId
,
pVgroup
);
return
0
;
...
...
@@ -1454,12 +1467,11 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb
return
0
;
}
int32_t
mndAdd
PrepareNewVg
Action
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVg
)
{
int32_t
mndAdd
NewVgPrepare
Action
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SVgObj
*
pVg
)
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
pVg
);
if
(
pRaw
==
NULL
)
goto
_err
;
STransAction
action
=
{.
pRaw
=
pRaw
,
.
msgType
=
TDMT_MND_CREATE_VG
};
if
(
mndTransAppendPrepareAction
(
pTrans
,
&
action
)
!=
0
)
goto
_err
;
if
(
mndTransAppendPrepareLog
(
pTrans
,
pRaw
)
!=
0
)
goto
_err
;
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_CREATING
);
pRaw
=
NULL
;
return
0
;
...
...
@@ -2728,13 +2740,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
int32_t
maxVgId
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_VGROUP
);
int32_t
srcVgId
=
newVg1
.
vgId
;
newVg1
.
vgId
=
maxVgId
;
if
(
mndAdd
PrepareNewVg
Action
(
pMnode
,
pTrans
,
&
newVg1
)
!=
0
)
goto
_OVER
;
if
(
mndAdd
NewVgPrepare
Action
(
pMnode
,
pTrans
,
&
newVg1
)
!=
0
)
goto
_OVER
;
if
(
mndAddAlterVnodeHashRangeAction
(
pMnode
,
pTrans
,
srcVgId
,
&
newVg1
)
!=
0
)
goto
_OVER
;
maxVgId
++
;
srcVgId
=
newVg2
.
vgId
;
newVg2
.
vgId
=
maxVgId
;
if
(
mndAdd
PrepareNewVg
Action
(
pMnode
,
pTrans
,
&
newVg2
)
!=
0
)
goto
_OVER
;
if
(
mndAdd
NewVgPrepare
Action
(
pMnode
,
pTrans
,
&
newVg2
)
!=
0
)
goto
_OVER
;
if
(
mndAddAlterVnodeHashRangeAction
(
pMnode
,
pTrans
,
srcVgId
,
&
newVg2
)
!=
0
)
goto
_OVER
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg1
)
!=
0
)
goto
_OVER
;
...
...
source/dnode/mnode/sdb/inc/sdb.h
浏览文件 @
b899a8b7
...
...
@@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
typedef
int32_t
(
*
SdbUpdateFp
)(
SSdb
*
pSdb
,
void
*
pSrcObj
,
void
*
pDstObj
);
typedef
int32_t
(
*
SdbDeleteFp
)(
SSdb
*
pSdb
,
void
*
pObj
,
bool
callFunc
);
typedef
int32_t
(
*
SdbDeployFp
)(
SMnode
*
pMnode
);
typedef
int32_t
(
*
SdbValidateFp
)(
SMnode
*
pMnode
,
void
*
pTrans
,
void
*
pObj
);
typedef
SSdbRow
*
(
*
SdbDecodeFp
)(
SSdbRaw
*
pRaw
);
typedef
SSdbRaw
*
(
*
SdbEncodeFp
)(
void
*
pObj
);
typedef
bool
(
*
sdbTraverseFp
)(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
);
...
...
@@ -189,6 +190,7 @@ typedef struct SSdb {
SdbDeployFp
deployFps
[
SDB_MAX
];
SdbEncodeFp
encodeFps
[
SDB_MAX
];
SdbDecodeFp
decodeFps
[
SDB_MAX
];
SdbValidateFp
validateFps
[
SDB_MAX
];
TdThreadMutex
filelock
;
}
SSdb
;
...
...
@@ -207,6 +209,7 @@ typedef struct {
SdbInsertFp
insertFp
;
SdbUpdateFp
updateFp
;
SdbDeleteFp
deleteFp
;
SdbValidateFp
validateFp
;
}
SSdbTable
;
typedef
struct
SSdbOpt
{
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
b899a8b7
...
...
@@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb
->
deployFps
[
sdbType
]
=
table
.
deployFp
;
pSdb
->
encodeFps
[
sdbType
]
=
table
.
encodeFp
;
pSdb
->
decodeFps
[
sdbType
]
=
table
.
decodeFp
;
pSdb
->
validateFps
[
sdbType
]
=
table
.
validateFp
;
int32_t
hashType
=
0
;
if
(
keyType
==
SDB_KEY_INT32
)
{
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
b899a8b7
...
...
@@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) {
return
"dropped"
;
case
SDB_STATUS_INIT
:
return
"init"
;
case
SDB_STATUS_UPDATE
:
return
"update"
;
default:
return
"undefine"
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbFS2.c
浏览文件 @
b899a8b7
...
...
@@ -17,12 +17,24 @@
#include "tsdbUpgrade.h"
#include "vnd.h"
extern
int
vnodeScheduleTask
(
int
(
*
execute
)(
void
*
),
void
*
arg
);
extern
int
vnodeScheduleTaskEx
(
int
tpid
,
int
(
*
execute
)(
void
*
),
void
*
arg
);
extern
int
vnodeScheduleTask
(
int
(
*
execute
)(
void
*
),
void
*
arg
);
extern
int
vnodeScheduleTaskEx
(
int
tpid
,
int
(
*
execute
)(
void
*
),
void
*
arg
);
extern
void
remove_file
(
const
char
*
fname
);
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
typedef
struct
STFileHashEntry
{
struct
STFileHashEntry
*
next
;
char
fname
[
TSDB_FILENAME_LEN
];
}
STFileHashEntry
;
typedef
struct
{
int32_t
numFile
;
int32_t
numBucket
;
STFileHashEntry
**
buckets
;
}
STFileHash
;
enum
{
TSDB_FS_STATE_NONE
=
0
,
TSDB_FS_STATE_OPEN
,
...
...
@@ -315,10 +327,8 @@ _exit:
}
// static int32_t
static
int32_t
apply_abort
(
STFileSystem
*
fs
)
{
// TODO
return
0
;
}
static
int32_t
tsdbFSDoSanAndFix
(
STFileSystem
*
fs
);
static
int32_t
apply_abort
(
STFileSystem
*
fs
)
{
return
tsdbFSDoSanAndFix
(
fs
);
}
static
int32_t
abort_edit
(
STFileSystem
*
fs
)
{
char
fname
[
TSDB_FILENAME_LEN
];
...
...
@@ -349,6 +359,180 @@ _exit:
return
code
;
}
static
int32_t
tsdbFSDoScanAndFixFile
(
STFileSystem
*
fs
,
const
STFileObj
*
fobj
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
// check file existence
if
(
!
taosCheckExistFile
(
fobj
->
fname
))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
tsdbError
(
"vgId:%d %s failed since file:%s does not exist"
,
TD_VID
(
fs
->
tsdb
->
pVnode
),
__func__
,
fobj
->
fname
);
return
code
;
}
{
// TODO: check file size
// int64_t fsize;
// if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
// code = TAOS_SYSTEM_ERROR(terrno);
// tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(fs->tsdb->pVnode), __func__,
// fobj->fname, tstrerror(code));
// return code;
// }
}
return
0
;
}
static
void
tsdbFSDestroyFileObjHash
(
STFileHash
*
hash
);
static
int32_t
tsdbFSAddEntryToFileObjHash
(
STFileHash
*
hash
,
const
char
*
fname
)
{
STFileHashEntry
*
entry
=
taosMemoryMalloc
(
sizeof
(
*
entry
));
if
(
entry
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
strcpy
(
entry
->
fname
,
fname
);
uint32_t
idx
=
MurmurHash3_32
(
fname
,
strlen
(
fname
))
%
hash
->
numBucket
;
entry
->
next
=
hash
->
buckets
[
idx
];
hash
->
buckets
[
idx
]
=
entry
;
hash
->
numFile
++
;
return
0
;
}
static
int32_t
tsdbFSCreateFileObjHash
(
STFileSystem
*
fs
,
STFileHash
*
hash
)
{
int32_t
code
=
0
;
char
fname
[
TSDB_FILENAME_LEN
];
// init hash table
hash
->
numFile
=
0
;
hash
->
numBucket
=
4096
;
hash
->
buckets
=
taosMemoryCalloc
(
hash
->
numBucket
,
sizeof
(
STFileHashEntry
*
));
if
(
hash
->
buckets
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
}
// vnode.json
current_fname
(
fs
->
tsdb
,
fname
,
TSDB_FCURRENT
);
code
=
tsdbFSAddEntryToFileObjHash
(
hash
,
fname
);
if
(
code
)
goto
_exit
;
// other
STFileSet
*
fset
=
NULL
;
TARRAY2_FOREACH
(
fs
->
fSetArr
,
fset
)
{
// data file
for
(
int32_t
i
=
0
;
i
<
TSDB_FTYPE_MAX
;
i
++
)
{
if
(
fset
->
farr
[
i
]
!=
NULL
)
{
code
=
tsdbFSAddEntryToFileObjHash
(
hash
,
fset
->
farr
[
i
]
->
fname
);
if
(
code
)
goto
_exit
;
}
}
// stt file
SSttLvl
*
lvl
=
NULL
;
TARRAY2_FOREACH
(
fset
->
lvlArr
,
lvl
)
{
STFileObj
*
fobj
;
TARRAY2_FOREACH
(
lvl
->
fobjArr
,
fobj
)
{
code
=
tsdbFSAddEntryToFileObjHash
(
hash
,
fobj
->
fname
);
if
(
code
)
goto
_exit
;
}
}
}
_exit:
if
(
code
)
{
tsdbFSDestroyFileObjHash
(
hash
);
}
return
code
;
}
static
const
STFileHashEntry
*
tsdbFSGetFileObjHashEntry
(
STFileHash
*
hash
,
const
char
*
fname
)
{
uint32_t
idx
=
MurmurHash3_32
(
fname
,
strlen
(
fname
))
%
hash
->
numBucket
;
STFileHashEntry
*
entry
=
hash
->
buckets
[
idx
];
while
(
entry
)
{
if
(
strcmp
(
entry
->
fname
,
fname
)
==
0
)
{
return
entry
;
}
entry
=
entry
->
next
;
}
return
NULL
;
}
static
void
tsdbFSDestroyFileObjHash
(
STFileHash
*
hash
)
{
for
(
int32_t
i
=
0
;
i
<
hash
->
numBucket
;
i
++
)
{
STFileHashEntry
*
entry
=
hash
->
buckets
[
i
];
while
(
entry
)
{
STFileHashEntry
*
next
=
entry
->
next
;
taosMemoryFree
(
entry
);
entry
=
next
;
}
}
taosMemoryFree
(
hash
->
buckets
);
memset
(
hash
,
0
,
sizeof
(
*
hash
));
}
static
int32_t
tsdbFSDoSanAndFix
(
STFileSystem
*
fs
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
{
// scan each file
STFileSet
*
fset
=
NULL
;
TARRAY2_FOREACH
(
fs
->
fSetArr
,
fset
)
{
// data file
for
(
int32_t
ftype
=
0
;
ftype
<
TSDB_FTYPE_MAX
;
ftype
++
)
{
if
(
fset
->
farr
[
ftype
]
==
NULL
)
continue
;
code
=
tsdbFSDoScanAndFixFile
(
fs
,
fset
->
farr
[
ftype
]);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
// stt file
SSttLvl
*
lvl
;
TARRAY2_FOREACH
(
fset
->
lvlArr
,
lvl
)
{
STFileObj
*
fobj
;
TARRAY2_FOREACH
(
lvl
->
fobjArr
,
fobj
)
{
code
=
tsdbFSDoScanAndFixFile
(
fs
,
fobj
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
}
}
}
{
// clear unreferenced files
STfsDir
*
dir
=
tfsOpendir
(
fs
->
tsdb
->
pVnode
->
pTfs
,
fs
->
tsdb
->
path
);
if
(
dir
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
terrno
);
lino
=
__LINE__
;
goto
_exit
;
}
STFileHash
fobjHash
=
{
0
};
code
=
tsdbFSCreateFileObjHash
(
fs
,
&
fobjHash
);
if
(
code
)
goto
_close_dir
;
for
(
const
STfsFile
*
file
=
NULL
;
(
file
=
tfsReaddir
(
dir
))
!=
NULL
;)
{
if
(
taosIsDir
(
file
->
aname
))
continue
;
if
(
tsdbFSGetFileObjHashEntry
(
&
fobjHash
,
file
->
aname
)
==
NULL
)
{
remove_file
(
file
->
aname
);
}
}
tsdbFSDestroyFileObjHash
(
&
fobjHash
);
_close_dir:
tfsClosedir
(
dir
);
}
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
fs
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbFSScanAndFix
(
STFileSystem
*
fs
)
{
fs
->
neid
=
0
;
...
...
@@ -356,8 +540,18 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
const
STFileSet
*
fset
;
TARRAY2_FOREACH
(
fs
->
fSetArr
,
fset
)
{
fs
->
neid
=
TMAX
(
fs
->
neid
,
tsdbTFileSetMaxCid
(
fset
));
}
// TODO
return
0
;
// scan and fix
int32_t
code
=
0
;
int32_t
lino
=
0
;
code
=
tsdbFSDoSanAndFix
(
fs
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
_exit:
if
(
code
)
{
TSDB_ERROR_LOG
(
TD_VID
(
fs
->
tsdb
->
pVnode
),
lino
,
code
);
}
return
code
;
}
static
int32_t
tsdbFSDupState
(
STFileSystem
*
fs
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbFile2.c
浏览文件 @
b899a8b7
...
...
@@ -41,7 +41,7 @@ static const struct {
[
TSDB_FTYPE_STT
]
=
{
"stt"
,
stt_to_json
,
stt_from_json
},
};
static
void
remove_file
(
const
char
*
fname
)
{
void
remove_file
(
const
char
*
fname
)
{
taosRemoveFile
(
fname
);
tsdbInfo
(
"file:%s is removed"
,
fname
);
}
...
...
tests/parallel_test/cases.task
浏览文件 @
b899a8b7
...
...
@@ -132,6 +132,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
...
...
tests/parallel_test/split_case.sh
浏览文件 @
b899a8b7
...
...
@@ -5,8 +5,10 @@ parm_path=$(pwd ${parm_path})
echo
"execute path:
${
parm_path
}
"
cd
${
parm_path
}
cp
cases.task
${
case_file
}
# comment udf and stream case in windows
sed
-i
'/udf/d'
${
case_file
}
sed
-i
'/Udf/d'
${
case_file
}
sed
-i
'/stream/d'
${
case_file
}
sed
-i
'/^$/d'
${
case_file
}
sed
-i
'$a\%%FINISHED%%'
${
case_file
}
...
...
tests/pytest/util/dnodes.py
浏览文件 @
b899a8b7
...
...
@@ -812,6 +812,15 @@ class TDDnodes:
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
).
strip
()
psCmd
=
"for /f %a in ('wmic process where
\"
name='tmq_sim'
\"
get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
).
strip
()
while
(
processID
):
print
(
processID
)
killCmd
=
"kill -9 %s > nul 2>&1"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
1
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
).
strip
()
else
:
psCmd
=
"ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
).
strip
()
...
...
tests/script/win-test-file
浏览文件 @
b899a8b7
...
...
@@ -26,8 +26,10 @@
./test.sh -f tsim/user/basic.sim
./test.sh -f tsim/user/password.sim
./test.sh -f tsim/user/privilege_db.sim
./test.sh -f tsim/user/privilege_sysinfo.sim
./test.sh -f tsim/user/privilege_topic.sim
./test.sh -f tsim/user/privilege_table.sim
./test.sh -f tsim/user/privilege_create_db.sim
./test.sh -f tsim/db/alter_option.sim
./test.sh -f tsim/db/alter_replica_31.sim
./test.sh -f tsim/db/basic1.sim
...
...
@@ -183,6 +185,7 @@
./test.sh -f tsim/query/scalarNull.sim
./test.sh -f tsim/query/session.sim
./test.sh -f tsim/query/join_interval.sim
./test.sh -f tsim/query/join_pk.sim
./test.sh -f tsim/query/unionall_as_table.sim
./test.sh -f tsim/query/multi_order_by.sim
./test.sh -f tsim/query/sys_tbname.sim
...
...
@@ -197,6 +200,7 @@
./test.sh -f tsim/query/tag_scan.sim
./test.sh -f tsim/query/nullColSma.sim
./test.sh -f tsim/query/bug3398.sim
./test.sh -f tsim/query/explain_tsorder.sim
./test.sh -f tsim/qnode/basic1.sim
./test.sh -f tsim/snode/basic1.sim
./test.sh -f tsim/mnode/basic1.sim
...
...
@@ -233,51 +237,6 @@
./test.sh -f tsim/table/table.sim
./test.sh -f tsim/table/tinyint.sim
./test.sh -f tsim/table/vgroup.sim
./test.sh -f tsim/stream/basic0.sim -g
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/basic3.sim
./test.sh -f tsim/stream/basic4.sim
./test.sh -f tsim/stream/checkStreamSTable1.sim
./test.sh -f tsim/stream/checkStreamSTable.sim
./test.sh -f tsim/stream/deleteInterval.sim
./test.sh -f tsim/stream/deleteSession.sim
./test.sh -f tsim/stream/deleteState.sim
./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/fillHistoryBasic1.sim
./test.sh -f tsim/stream/fillHistoryBasic2.sim
./test.sh -f tsim/stream/fillHistoryBasic3.sim
./test.sh -f tsim/stream/fillIntervalDelete0.sim
./test.sh -f tsim/stream/fillIntervalDelete1.sim
./test.sh -f tsim/stream/fillIntervalLinear.sim
./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
./test.sh -f tsim/stream/fillIntervalPrevNext.sim
./test.sh -f tsim/stream/fillIntervalRange.sim
./test.sh -f tsim/stream/fillIntervalValue.sim
./test.sh -f tsim/stream/ignoreCheckUpdate.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
./test.sh -f tsim/stream/partitionbyColumnSession.sim
./test.sh -f tsim/stream/partitionbyColumnState.sim
./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/pauseAndResume.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/sliding.sim
./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/state1.sim
./test.sh -f tsim/stream/triggerInterval0.sim
./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/udTableAndTag0.sim
./test.sh -f tsim/stream/udTableAndTag1.sim
./test.sh -f tsim/stream/udTableAndTag2.sim
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/trans/lossdata1.sim
./test.sh -f tsim/tmq/basic1.sim
./test.sh -f tsim/tmq/basic2.sim
...
...
tests/system-test/1-insert/precisionNS.py
浏览文件 @
b899a8b7
...
...
@@ -226,7 +226,7 @@ class TDTestCase:
# init
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
seed
=
time
.
clock_gettime
(
time
.
CLOCK_REALTIME
)
seed
=
time
.
time
()
%
10000
random
.
seed
(
seed
)
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
...
...
tests/system-test/7-tmq/tmqClientConsLog.py
浏览文件 @
b899a8b7
...
...
@@ -93,19 +93,20 @@ class TDTestCase:
cfgPath
=
tdCom
.
getClientCfgPath
()
taosLogFile
=
'%s/../log/taoslog*'
%
(
cfgPath
)
filterResultFile
=
'%s/../log/filter'
%
(
cfgPath
)
cmdStr
=
'grep "process poll rsp, vgId:" %s >> %s'
%
(
taosLogFile
,
filterResultFile
)
cmdStr
=
'grep
-h
"process poll rsp, vgId:" %s >> %s'
%
(
taosLogFile
,
filterResultFile
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
consumerDict
=
{}
for
index
,
line
in
enumerate
(
open
(
filterResultFile
,
'r'
)):
# tdLog.info("row[%d]: %s"%(index, line))
valueList
=
line
.
split
(
','
)
# for i in range(len(valueList)):
# tdLog.info("index[%d]: %s"%(i, valueList[i]))
# get consumer id
list2
=
valueList
[
0
].
split
(
':'
)
list3
=
list2
[
4
].
split
()
list3
=
list2
[
3
].
split
()
consumerId
=
list3
[
0
]
print
(
"consumerId: %s"
%
(
consumerId
))
...
...
tests/system-test/7-tmq/tmqDataPrecisionUnit.py
0 → 100644
浏览文件 @
b899a8b7
import
sys
import
re
import
time
import
threading
from
taos.tmq
import
*
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
self
.
db_name
=
"tmq_db"
self
.
topic_name
=
"tmq_topic"
self
.
stable_name
=
"stb"
self
.
rows_per_table
=
1000
self
.
ctb_num
=
100
def
prepareData
(
self
,
precisionUnit
=
"ms"
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
startTS
=
1672502400000
if
precisionUnit
==
"us"
:
startTS
=
1672502400000000
elif
precisionUnit
==
"ns"
:
startTS
=
1672502400000000000
paraDict
=
{
'dbName'
:
self
.
db_name
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
self
.
stable_name
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
self
.
ctb_num
,
'rowsPerTbl'
:
self
.
rows_per_table
,
'batchNum'
:
100
,
'startTs'
:
startTS
,
# 2023-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
# init the consumer database
tmqCom
.
initConsumerTable
()
# create testing database、stable、ctables
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
self
.
replicaVar
,
precision
=
precisionUnit
)
tdLog
.
info
(
"create database %s successfully"
%
paraDict
[
"dbName"
])
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create stable %s successfully"
%
paraDict
[
"stbName"
])
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create child tables successfully"
)
# insert data into tables and wait the async thread exit
tdLog
.
info
(
"insert data into tables"
)
pThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
pThread
.
join
()
def
run
(
self
):
"""Check tmq feature for different data precision unit like "ms、us、ns"
"""
precision_unit
=
[
"ms"
,
"us"
,
"ns"
]
for
unit
in
precision_unit
:
tdLog
.
info
(
f
"start to test precision unit
{
unit
}
"
)
self
.
prepareData
(
precisionUnit
=
unit
)
# drop database if exists
tdSql
.
execute
(
f
"drop database if exists
{
self
.
db_name
}
"
)
self
.
prepareData
(
unit
)
# create topic
tdLog
.
info
(
"create topic from %s"
%
self
.
stable_name
)
queryString
=
"select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "
%
(
self
.
db_name
,
self
.
stable_name
)
sqlString
=
"create topic %s as %s"
%
(
self
.
topic_name
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# save consumer info
consumerId
=
0
expectrowcnt
=
self
.
rows_per_table
*
self
.
ctb_num
topicList
=
self
.
topic_name
ifcheckdata
=
0
ifManualCommit
=
0
keyList
=
'group.id:cgrp1,
\
enable.auto.commit:false,
\
auto.commit.interval.ms:6000,
\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
# start consume processor
paraDict
=
{
'pollDelay'
:
15
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
self
.
db_name
,
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsFromQuery
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, act query rows: %d "
%
(
totalConsumeRows
,
totalRowsFromQuery
))
if
totalConsumeRows
<
totalRowsFromQuery
:
tdLog
.
exit
(
"tmq consume rows error!"
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
self
.
topic_name
)
tdSql
.
query
(
"drop topic %s"
%
self
.
topic_name
)
tdSql
.
execute
(
"drop database %s"
%
self
.
db_name
)
def
stop
(
self
):
tdSql
.
execute
(
f
"drop database if exists
{
self
.
db_name
}
"
)
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/test.py
浏览文件 @
b899a8b7
...
...
@@ -199,7 +199,7 @@ if __name__ == "__main__":
createDnodeNums
=
value
if
key
in
[
'-i'
,
'--independentMnode'
]:
independentMnode
=
valu
e
independentMnode
=
Fals
e
if
key
in
[
'-R'
,
'--restful'
]:
restful
=
True
...
...
@@ -553,6 +553,7 @@ if __name__ == "__main__":
else
:
# dnode > 1 cluster
tdLog
.
debug
(
"create an cluster with %s nodes and make %s dnode as independent mnode"
%
(
dnodeNums
,
mnodeNums
))
print
(
independentMnode
,
"independentMnode valuse"
)
dnodeslist
=
cluster
.
configure_cluster
(
dnodeNums
=
dnodeNums
,
mnodeNums
=
mnodeNums
,
independentMnode
=
independentMnode
)
tdDnodes
=
ClusterDnodes
(
dnodeslist
)
tdDnodes
.
init
(
deployPath
,
masterIp
)
...
...
tests/system-test/win-test-file
浏览文件 @
b899a8b7
...
...
@@ -17,6 +17,7 @@ python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py -f 7-tmq/tmqDropStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py
...
...
@@ -133,6 +134,8 @@ python3 ./test.py -f 0-others/sysinfo.py
python3 ./test.py -f 0-others/user_control.py
python3 ./test.py -f 0-others/user_manage.py
python3 ./test.py -f 0-others/user_privilege.py
python3 ./test.py -f 0-others/user_privilege_show.py
python3 ./test.py -f 0-others/user_privilege_all.py
python3 ./test.py -f 0-others/fsync.py
python3 ./test.py -f 0-others/multilevel.py
python3 ./test.py -f 0-others/compatibility.py
...
...
@@ -421,6 +424,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 6 -M 3
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录