Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
21748def
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
21748def
编写于
11月 02, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/TD-20095
上级
00c37cf1
88ee272b
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
321 addition
and
277 deletion
+321
-277
cmake/cmake.version
cmake/cmake.version
+1
-1
include/libs/sync/sync.h
include/libs/sync/sync.h
+17
-29
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+9
-8
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+8
-4
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+126
-51
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+4
-2
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+18
-9
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+1
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+2
-2
source/libs/sync/inc/syncTools.h
source/libs/sync/inc/syncTools.h
+1
-3
source/libs/sync/inc/syncUtil.h
source/libs/sync/inc/syncUtil.h
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+20
-65
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+1
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+0
-12
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
+7
-8
source/libs/sync/test/syncConfigChangeTest.cpp
source/libs/sync/test/syncConfigChangeTest.cpp
+6
-8
source/libs/sync/test/syncReplicateTest.cpp
source/libs/sync/test/syncReplicateTest.cpp
+3
-3
source/libs/sync/test/syncSnapshotTest.cpp
source/libs/sync/test/syncSnapshotTest.cpp
+3
-3
source/libs/sync/test/syncTestTool.cpp
source/libs/sync/test/syncTestTool.cpp
+8
-8
source/libs/sync/test/syncWriteTest.cpp
source/libs/sync/test/syncWriteTest.cpp
+3
-3
tests/system-test/6-cluster/5dnode3mnodeAdd1Ddnoe.py
tests/system-test/6-cluster/5dnode3mnodeAdd1Ddnoe.py
+5
-3
tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertData.py
...stem-test/6-cluster/5dnode3mnodeRestartDnodeInsertData.py
+13
-9
tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py
...test/6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py
+3
-1
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
...-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
+3
-2
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py
...test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py
+3
-2
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
...-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
+5
-3
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py
...test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py
+3
-2
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
...-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
+3
-1
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py
...test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py
+8
-7
tests/system-test/6-cluster/clusterCommonCheck.py
tests/system-test/6-cluster/clusterCommonCheck.py
+13
-10
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+23
-16
未找到文件。
cmake/cmake.version
浏览文件 @
21748def
...
...
@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.0.1.
5
")
SET(TD_VER_NUMBER "3.0.1.
6
")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
...
...
include/libs/sync/sync.h
浏览文件 @
21748def
...
...
@@ -25,8 +25,6 @@ extern "C" {
#include "tlrucache.h"
#include "tmsgcb.h"
extern
bool
gRaftDetailLog
;
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
...
...
@@ -132,7 +130,7 @@ typedef struct SSnapshotMeta {
typedef
struct
SSyncFSM
{
void
*
data
;
void
(
*
FpCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpPreCommitCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
void
(
*
FpRollBackCb
)(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
);
...
...
@@ -202,37 +200,27 @@ typedef struct SSyncInfo {
int32_t
(
*
syncEqCtrlMsg
)(
const
SMsgCb
*
msgcb
,
SRpcMsg
*
pMsg
);
}
SSyncInfo
;
int32_t
syncInit
();
void
syncCleanUp
();
bool
syncIsInit
();
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
);
void
syncStart
(
int64_t
rid
);
void
syncStop
(
int64_t
rid
);
ESyncState
syncGetMyRole
(
int64_t
rid
);
bool
syncIsReady
(
int64_t
rid
);
const
char
*
syncGetMyRoleStr
(
int64_t
rid
);
bool
syncRestoreFinish
(
int64_t
rid
);
SyncTerm
syncGetMyTerm
(
int64_t
rid
);
SyncIndex
syncGetLastIndex
(
int64_t
rid
);
SyncIndex
syncGetCommitIndex
(
int64_t
rid
);
SyncGroupId
syncGetVgId
(
int64_t
rid
);
void
syncGetEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
void
syncGetRetryEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
// int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
const
char
*
syncStr
(
ESyncState
state
);
bool
syncIsRestoreFinish
(
int64_t
rid
);
int32_t
syncGetSnapshotByIndex
(
int64_t
rid
,
SyncIndex
index
,
SSnapshot
*
pSnapshot
);
typedef
struct
SSyncState
{
ESyncState
state
;
bool
restored
;
}
SSyncState
;
int32_t
syncInit
();
void
syncCleanUp
();
int64_t
syncOpen
(
SSyncInfo
*
pSyncInfo
);
void
syncStart
(
int64_t
rid
);
void
syncStop
(
int64_t
rid
);
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncProcessMsg
(
int64_t
rid
,
SRpcMsg
*
pMsg
);
int32_t
syncReconfig
(
int64_t
rid
,
SSyncCfg
*
pCfg
);
int32_t
syncLeaderTransfer
(
int64_t
rid
);
int32_t
syncBeginSnapshot
(
int64_t
rid
,
int64_t
lastApplyIndex
);
int32_t
syncEndSnapshot
(
int64_t
rid
);
int32_t
syncLeaderTransfer
(
int64_t
rid
);
int32_t
syncStepDown
(
int64_t
rid
,
SyncTerm
newTerm
);
int32_t
syncProcessMsg
(
int64_t
rid
,
SRpcMsg
*
pMsg
);
const
char
*
sync
UtilState2String
(
ESyncState
state
);
SSyncState
syncGetState
(
int64_t
rid
);
void
syncGetRetryEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
const
char
*
sync
Str
(
ESyncState
state
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
21748def
...
...
@@ -487,14 +487,14 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
}
if
(
mndAcquireRpc
(
pMsg
->
info
.
node
)
==
0
)
return
0
;
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
const
char
*
role
=
syncGetMyRoleStr
(
pMnode
->
syncMgmt
.
sync
);
bool
restored
=
syncIsRestoreFinish
(
pMnode
->
syncMgmt
.
sync
);
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SSyncState
state
=
syncGetState
(
pMnode
->
syncMgmt
.
sync
);
if
(
pMsg
->
msgType
==
TDMT_MND_TMQ_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TELEM_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TRANS_TIMER
||
pMsg
->
msgType
==
TDMT_MND_TTL_TIMER
||
pMsg
->
msgType
==
TDMT_MND_UPTIME_TIMER
)
{
mTrace
(
"timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s "
,
pMnode
->
restored
,
pMnode
->
stopped
,
restored
,
role
);
pMnode
->
stopped
,
state
.
restored
,
syncStr
(
state
.
restored
)
);
return
-
1
;
}
...
...
@@ -505,8 +505,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
mDebug
(
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
"role:%s, redirect numOfEps:%d inUse:%d"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
(),
pMnode
->
restored
,
pMnode
->
stopped
,
restored
,
role
,
epSet
.
numOfEps
,
epSet
.
inUse
);
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
(),
pMnode
->
restored
,
pMnode
->
stopped
,
state
.
restored
,
syncStr
(
state
.
restored
),
epSet
.
numOfEps
,
epSet
.
inUse
);
if
(
epSet
.
numOfEps
>
0
)
{
for
(
int32_t
i
=
0
;
i
<
epSet
.
numOfEps
;
++
i
)
{
...
...
@@ -729,8 +729,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
}
int32_t
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
)
{
pLoad
->
syncState
=
syncGetMyRole
(
pMnode
->
syncMgmt
.
sync
);
pLoad
->
syncRestore
=
pMnode
->
restored
;
SSyncState
state
=
syncGetState
(
pMnode
->
syncMgmt
.
sync
);
pLoad
->
syncState
=
state
.
state
;
pLoad
->
syncRestore
=
state
.
restored
;
mTrace
(
"mnode current syncState is %s, syncRestore:%d"
,
syncStr
(
pLoad
->
syncState
),
pLoad
->
syncRestore
);
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
21748def
...
...
@@ -349,11 +349,15 @@ void mndSyncStop(SMnode *pMnode) {
}
bool
mndIsLeader
(
SMnode
*
pMnode
)
{
SSync
Mgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
SSync
State
state
=
syncGetState
(
pMnode
->
syncMgmt
.
sync
)
;
if
(
!
syncIsReady
(
pMgmt
->
sync
))
{
// get terrno from syncIsReady
// terrno = TSDB_CODE_SYN_NOT_LEADER;
if
(
state
.
state
!=
TAOS_SYNC_STATE_LEADER
||
!
state
.
restored
)
{
if
(
state
.
state
!=
TAOS_SYNC_STATE_LEADER
)
{
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
}
else
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
mDebug
(
"vgId:1, mnode not ready, state:%s, restore:%d"
,
syncStr
(
state
.
state
),
state
.
restored
);
return
false
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
21748def
...
...
@@ -138,7 +138,7 @@ typedef struct SReaderStatus {
bool
loadFromFile
;
// check file stage
bool
composedDataBlock
;
// the returned data block is a composed block or not
SHashObj
*
pTableMap
;
// SHash<STableBlockScanInfo>
STableBlockScanInfo
*
pTableIter
;
// table iterator used in building in-memory buffer data blocks.
STableBlockScanInfo
*
*
pTableIter
;
// table iterator used in building in-memory buffer data blocks.
SUidOrderCheckInfo
uidCheckInfo
;
// check all table in uid order
SFileBlockDumpInfo
fBlockDumpInfo
;
SDFileSet
*
pCurrentFileset
;
// current opened file set
...
...
@@ -147,6 +147,12 @@ typedef struct SReaderStatus {
SDataBlockIter
blockIter
;
}
SReaderStatus
;
typedef
struct
SBlockInfoBuf
{
int32_t
currentIndex
;
SArray
*
pData
;
int32_t
numPerBucket
;
}
SBlockInfoBuf
;
struct
STsdbReader
{
STsdb
*
pTsdb
;
uint64_t
suid
;
...
...
@@ -164,9 +170,9 @@ struct STsdbReader {
STSchema
*
pMemSchema
;
// the previous schema for in-memory data, to avoid load schema too many times
SDataFReader
*
pFileReader
;
SVersionRange
verRange
;
int32_t
step
;
STsdbReader
*
innerReader
[
2
];
SBlockInfoBuf
blockInfoBuf
;
int32_t
step
;
STsdbReader
*
innerReader
[
2
];
};
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
);
...
...
@@ -232,6 +238,50 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
initBlockScanInfoBuf
(
SBlockInfoBuf
*
pBuf
,
int32_t
numOfTables
)
{
int32_t
num
=
numOfTables
/
pBuf
->
numPerBucket
;
int32_t
remainder
=
numOfTables
%
pBuf
->
numPerBucket
;
if
(
pBuf
->
pData
==
NULL
)
{
pBuf
->
pData
=
taosArrayInit
(
num
+
1
,
POINTER_BYTES
);
}
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
char
*
p
=
taosMemoryCalloc
(
pBuf
->
numPerBucket
,
sizeof
(
STableBlockScanInfo
));
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosArrayPush
(
pBuf
->
pData
,
&
p
);
}
if
(
remainder
>
0
)
{
char
*
p
=
taosMemoryCalloc
(
remainder
,
sizeof
(
STableBlockScanInfo
));
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
taosArrayPush
(
pBuf
->
pData
,
&
p
);
}
return
TSDB_CODE_SUCCESS
;
}
static
void
clearBlockScanInfoBuf
(
SBlockInfoBuf
*
pBuf
)
{
size_t
num
=
taosArrayGetSize
(
pBuf
->
pData
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
char
**
p
=
taosArrayGet
(
pBuf
->
pData
,
i
);
taosMemoryFree
(
*
p
);
}
taosArrayDestroy
(
pBuf
->
pData
);
}
static
void
*
getPosInBlockInfoBuf
(
SBlockInfoBuf
*
pBuf
,
int32_t
index
)
{
int32_t
bucketIndex
=
index
/
pBuf
->
numPerBucket
;
char
**
pBucket
=
taosArrayGet
(
pBuf
->
pData
,
bucketIndex
);
return
(
*
pBucket
)
+
(
index
%
pBuf
->
numPerBucket
)
*
sizeof
(
STableBlockScanInfo
);
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static
SHashObj
*
createDataBlockScanInfo
(
STsdbReader
*
pTsdbReader
,
const
STableKeyInfo
*
idList
,
int32_t
numOfTables
)
{
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
...
...
@@ -242,9 +292,23 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
}
int64_t
st
=
taosGetTimestampUs
();
initBlockScanInfoBuf
(
&
pTsdbReader
->
blockInfoBuf
,
numOfTables
);
for
(
int32_t
j
=
0
;
j
<
numOfTables
;
++
j
)
{
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
idList
[
j
].
uid
};
STableBlockScanInfo
*
pScanInfo
=
getPosInBlockInfoBuf
(
&
pTsdbReader
->
blockInfoBuf
,
j
);
pScanInfo
->
uid
=
idList
[
j
].
uid
;
if
(
ASCENDING_TRAVERSE
(
pTsdbReader
->
order
))
{
int64_t
skey
=
pTsdbReader
->
window
.
skey
;
pScanInfo
->
lastKey
=
(
skey
>
INT64_MIN
)
?
(
skey
-
1
)
:
skey
;
}
else
{
int64_t
ekey
=
pTsdbReader
->
window
.
ekey
;
pScanInfo
->
lastKey
=
(
ekey
<
INT64_MAX
)
?
(
ekey
+
1
)
:
ekey
;
}
taosHashPut
(
pTableMap
,
&
pScanInfo
->
uid
,
sizeof
(
uint64_t
),
&
pScanInfo
,
POINTER_BYTES
);
#if 0
// STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey;
info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
...
...
@@ -254,7 +318,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
}
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
tsdbDebug
(
"%p check table uid:%"
PRId64
" from lastKey:%"
PRId64
" %s"
,
pTsdbReader
,
info
.
uid
,
info
.
lastKey
,
#endif
tsdbTrace
(
"%p check table uid:%"
PRId64
" from lastKey:%"
PRId64
" %s"
,
pTsdbReader
,
pScanInfo
->
uid
,
pScanInfo
->
lastKey
,
pTsdbReader
->
idStr
);
}
...
...
@@ -266,18 +332,19 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return
pTableMap
;
}
static
void
resetDataBlockScanInfo
(
SHashObj
*
pTableMap
,
int64_t
ts
)
{
STableBlockScanInfo
*
p
=
NULL
;
static
void
resetAllDataBlockScanInfo
(
SHashObj
*
pTableMap
,
int64_t
ts
)
{
STableBlockScanInfo
**
p
=
NULL
;
while
((
p
=
taosHashIterate
(
pTableMap
,
p
))
!=
NULL
)
{
p
->
iterInit
=
false
;
p
->
iiter
.
hasVal
=
false
;
if
(
p
->
iter
.
iter
!=
NULL
)
{
p
->
iter
.
iter
=
tsdbTbDataIterDestroy
(
p
->
iter
.
iter
);
STableBlockScanInfo
*
pInfo
=
*
(
STableBlockScanInfo
**
)
p
;
pInfo
->
iterInit
=
false
;
pInfo
->
iiter
.
hasVal
=
false
;
if
(
pInfo
->
iter
.
iter
!=
NULL
)
{
pInfo
->
iter
.
iter
=
tsdbTbDataIterDestroy
(
pInfo
->
iter
.
iter
);
}
p
->
delSkyline
=
taosArrayDestroy
(
p
->
delSkyline
);
p
->
lastKey
=
ts
;
p
Info
->
delSkyline
=
taosArrayDestroy
(
pInfo
->
delSkyline
);
p
Info
->
lastKey
=
ts
;
}
}
...
...
@@ -298,10 +365,10 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
tMapDataClear
(
&
p
->
mapData
);
}
static
void
destroyBlockScanInfo
(
SHashObj
*
pTableMap
)
{
STableBlockScanInfo
*
p
=
NULL
;
static
void
destroy
All
BlockScanInfo
(
SHashObj
*
pTableMap
)
{
void
*
p
=
NULL
;
while
((
p
=
taosHashIterate
(
pTableMap
,
p
))
!=
NULL
)
{
clearBlockScanInfo
(
p
);
clearBlockScanInfo
(
*
(
STableBlockScanInfo
**
)
p
);
}
taosHashCleanup
(
pTableMap
);
...
...
@@ -506,7 +573,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader
->
verRange
=
getQueryVerRange
(
pVnode
,
pCond
,
level
);
pReader
->
type
=
pCond
->
type
;
pReader
->
window
=
updateQueryTimeWindow
(
pReader
->
pTsdb
,
&
pCond
->
twindows
);
pReader
->
blockInfoBuf
.
numPerBucket
=
1000
;
// 1000 tables per bucket
ASSERT
(
pCond
->
numOfCols
>
0
);
limitOutputBufferSize
(
pCond
,
&
pReader
->
capacity
);
...
...
@@ -577,7 +644,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
continue
;
}
STableBlockScanInfo
*
pScanInfo
=
p
;
STableBlockScanInfo
*
pScanInfo
=
*
(
STableBlockScanInfo
**
)
p
;
if
(
pScanInfo
->
pBlockList
==
NULL
)
{
pScanInfo
->
pBlockList
=
taosArrayInit
(
4
,
sizeof
(
SBlockIndex
));
}
...
...
@@ -597,7 +664,7 @@ _end:
}
static
void
cleanupTableScanInfo
(
SHashObj
*
pTableMap
)
{
STableBlockScanInfo
*
px
=
NULL
;
STableBlockScanInfo
*
*
px
=
NULL
;
while
(
1
)
{
px
=
taosHashIterate
(
pTableMap
,
px
);
if
(
px
==
NULL
)
{
...
...
@@ -605,8 +672,8 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
}
// reset the index in last block when handing a new file
tMapDataClear
(
&
px
->
mapData
);
taosArrayClear
(
px
->
pBlockList
);
tMapDataClear
(
&
(
*
px
)
->
mapData
);
taosArrayClear
(
(
*
px
)
->
pBlockList
);
}
}
...
...
@@ -621,7 +688,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
SBlockIdx
*
pBlockIdx
=
taosArrayGet
(
pIndexList
,
i
);
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockIdx
->
uid
,
sizeof
(
int64_t
));
STableBlockScanInfo
*
pScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockIdx
->
uid
,
sizeof
(
int64_t
));
tMapDataReset
(
&
pScanInfo
->
mapData
);
tsdbReadDataBlk
(
pReader
->
pFileReader
,
pBlockIdx
,
&
pScanInfo
->
mapData
);
...
...
@@ -1069,14 +1137,14 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
static
int32_t
doSetCurrentBlock
(
SDataBlockIter
*
pBlockIter
,
const
char
*
idStr
)
{
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
pBlockIter
);
if
(
pBlockInfo
!=
NULL
)
{
STableBlockScanInfo
*
pScanInfo
=
taosHashGet
(
pBlockIter
->
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
STableBlockScanInfo
*
*
pScanInfo
=
taosHashGet
(
pBlockIter
->
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
pScanInfo
==
NULL
)
{
tsdbError
(
"failed to locate the uid:%"
PRIu64
" in query table uid list, %s"
,
pBlockInfo
->
uid
,
idStr
);
return
TSDB_CODE_INVALID_PARA
;
}
SBlockIndex
*
pIndex
=
taosArrayGet
(
pScanInfo
->
pBlockList
,
pBlockInfo
->
tbBlockIdx
);
tMapDataGetItemByIdx
(
&
pScanInfo
->
mapData
,
pIndex
->
ordinalIndex
,
&
pBlockIter
->
block
,
tGetDataBlk
);
SBlockIndex
*
pIndex
=
taosArrayGet
(
(
*
pScanInfo
)
->
pBlockList
,
pBlockInfo
->
tbBlockIdx
);
tMapDataGetItemByIdx
(
&
(
*
pScanInfo
)
->
mapData
,
pIndex
->
ordinalIndex
,
&
pBlockIter
->
block
,
tGetDataBlk
);
}
#if 0
...
...
@@ -1111,7 +1179,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
break
;
}
STableBlockScanInfo
*
pTableScanInfo
=
(
STableBlockScanInfo
*
)
ptr
;
STableBlockScanInfo
*
pTableScanInfo
=
*
(
STableBlockScanInfo
*
*
)
ptr
;
if
(
pTableScanInfo
->
pBlockList
==
NULL
||
taosArrayGetSize
(
pTableScanInfo
->
pBlockList
)
==
0
)
{
continue
;
}
...
...
@@ -2235,7 +2303,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
STableBlockScanInfo
*
pBlockScanInfo
=
NULL
;
if
(
pBlockInfo
!=
NULL
)
{
pBlockScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
pBlockScanInfo
==
NULL
)
{
code
=
TSDB_CODE_INVALID_PARA
;
tsdbError
(
"failed to locate the uid:%"
PRIu64
" in query table uid list, total tables:%d, %s"
,
pBlockInfo
->
uid
,
...
...
@@ -2255,7 +2323,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
}
}
else
{
// file blocks not exist
pBlockScanInfo
=
pReader
->
status
.
pTableIter
;
pBlockScanInfo
=
*
pReader
->
status
.
pTableIter
;
}
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
...
...
@@ -2480,7 +2548,7 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
void
*
p
=
taosHashIterate
(
pStatus
->
pTableMap
,
NULL
);
while
(
p
!=
NULL
)
{
STableBlockScanInfo
*
pScanInfo
=
p
;
STableBlockScanInfo
*
pScanInfo
=
*
(
STableBlockScanInfo
**
)
p
;
pOrderCheckInfo
->
tableUidList
[
index
++
]
=
pScanInfo
->
uid
;
p
=
taosHashIterate
(
pStatus
->
pTableMap
,
p
);
}
...
...
@@ -2554,7 +2622,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while
(
1
)
{
// load the last data block of current table
STableBlockScanInfo
*
pScanInfo
=
pStatus
->
pTableIter
;
STableBlockScanInfo
*
pScanInfo
=
*
(
STableBlockScanInfo
**
)
pStatus
->
pTableIter
;
bool
hasVal
=
initLastBlockReader
(
pLastBlockReader
,
pScanInfo
,
pReader
);
if
(
!
hasVal
)
{
bool
hasNexTable
=
moveToNextTable
(
pOrderedCheckInfo
,
pStatus
);
...
...
@@ -2592,9 +2660,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SLastBlockReader
*
pLastBlockReader
=
pReader
->
status
.
fileIter
.
pLastBlockReader
;
if
(
pBlockInfo
!=
NULL
)
{
pScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
pScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
}
else
{
pScanInfo
=
pReader
->
status
.
pTableIter
;
pScanInfo
=
*
pReader
->
status
.
pTableIter
;
}
if
(
pScanInfo
==
NULL
)
{
...
...
@@ -2659,11 +2727,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
}
}
STableBlockScanInfo
*
pBlockScanInfo
=
pStatus
->
pTableIter
;
initMemDataIterator
(
pBlockScanInfo
,
pReader
);
STableBlockScanInfo
*
*
pBlockScanInfo
=
pStatus
->
pTableIter
;
initMemDataIterator
(
*
pBlockScanInfo
,
pReader
);
int64_t
endKey
=
(
ASCENDING_TRAVERSE
(
pReader
->
order
))
?
INT64_MAX
:
INT64_MIN
;
int32_t
code
=
buildDataBlockFromBuf
(
pReader
,
pBlockScanInfo
,
endKey
);
int32_t
code
=
buildDataBlockFromBuf
(
pReader
,
*
pBlockScanInfo
,
endKey
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -3465,18 +3533,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
// TODO refactor: with createDataBlockScanInfo
int32_t
tsdbSetTableList
(
STsdbReader
*
pReader
,
const
void
*
pTableList
,
int32_t
num
)
{
ASSERT
(
pReader
!=
NULL
);
int32_t
size
=
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
STableBlockScanInfo
*
p
=
NULL
;
STableBlockScanInfo
*
*
p
=
NULL
;
while
((
p
=
taosHashIterate
(
pReader
->
status
.
pTableMap
,
p
))
!=
NULL
)
{
clearBlockScanInfo
(
p
);
clearBlockScanInfo
(
*
p
);
}
// todo handle the case where size is less than the value of num
ASSERT
(
size
>=
num
);
taosHashClear
(
pReader
->
status
.
pTableMap
);
STableKeyInfo
*
pList
=
(
STableKeyInfo
*
)
pTableList
;
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
pList
[
i
].
uid
};
taosHashPut
(
pReader
->
status
.
pTableMap
,
&
info
.
uid
,
sizeof
(
uint64_t
),
&
info
,
sizeof
(
info
));
STableBlockScanInfo
*
pInfo
=
getPosInBlockInfoBuf
(
&
pReader
->
blockInfoBuf
,
i
);
pInfo
->
uid
=
pList
[
i
].
uid
;
taosHashPut
(
pReader
->
status
.
pTableMap
,
&
pInfo
->
uid
,
sizeof
(
uint64_t
),
&
pInfo
,
POINTER_BYTES
);
}
return
TDB_CODE_SUCCESS
;
...
...
@@ -3680,8 +3753,9 @@ void tsdbReaderClose(STsdbReader* pReader) {
cleanupDataBlockIterator
(
&
pReader
->
status
.
blockIter
);
size_t
numOfTables
=
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
destroyBlockScanInfo
(
pReader
->
status
.
pTableMap
);
destroy
All
BlockScanInfo
(
pReader
->
status
.
pTableMap
);
blockDataDestroy
(
pReader
->
pResBlock
);
clearBlockScanInfoBuf
(
&
pReader
->
blockInfoBuf
);
if
(
pReader
->
pFileReader
!=
NULL
)
{
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
...
...
@@ -3765,7 +3839,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if
(
pReader
->
step
==
EXTERNAL_ROWS_PREV
)
{
// prepare for the main scan
int32_t
code
=
doOpenReaderImpl
(
pReader
);
resetDataBlockScanInfo
(
pReader
->
status
.
pTableMap
,
pReader
->
innerReader
[
0
]
->
window
.
ekey
);
reset
All
DataBlockScanInfo
(
pReader
->
status
.
pTableMap
,
pReader
->
innerReader
[
0
]
->
window
.
ekey
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -3782,7 +3856,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if
(
pReader
->
innerReader
[
1
]
!=
NULL
&&
pReader
->
step
==
EXTERNAL_ROWS_MAIN
)
{
// prepare for the next row scan
int32_t
code
=
doOpenReaderImpl
(
pReader
->
innerReader
[
1
]);
resetDataBlockScanInfo
(
pReader
->
innerReader
[
1
]
->
status
.
pTableMap
,
pReader
->
window
.
ekey
);
reset
All
DataBlockScanInfo
(
pReader
->
innerReader
[
1
]
->
status
.
pTableMap
,
pReader
->
window
.
ekey
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -3798,7 +3872,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
}
bool
tsdbTableNextDataBlock
(
STsdbReader
*
pReader
,
uint64_t
uid
)
{
STableBlockScanInfo
*
pBlockScanInfo
=
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
uid
,
sizeof
(
uid
));
STableBlockScanInfo
*
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pReader
->
status
.
pTableMap
,
&
uid
,
sizeof
(
uid
));
if
(
pBlockScanInfo
==
NULL
)
{
// no data block for the table of given uid
return
false
;
}
...
...
@@ -3911,7 +3985,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
}
SFileDataBlockInfo
*
pBlockInfo
=
getCurrentBlockInfo
(
&
pStatus
->
blockIter
);
STableBlockScanInfo
*
pBlockScanInfo
=
taosHashGet
(
pStatus
->
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
STableBlockScanInfo
*
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
taosHashGet
(
pStatus
->
pTableMap
,
&
pBlockInfo
->
uid
,
sizeof
(
pBlockInfo
->
uid
));
if
(
pBlockScanInfo
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
tsdbError
(
"failed to locate the uid:%"
PRIu64
" in query table uid list, total tables:%d, %s"
,
pBlockInfo
->
uid
,
...
...
@@ -3947,6 +4021,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return
TSDB_CODE_SUCCESS
;
}
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
pReader
->
order
=
pCond
->
order
;
pReader
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pReader
->
status
.
loadFromFile
=
true
;
...
...
@@ -3963,13 +4039,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t
numOfTables
=
taosHashGetSize
(
pReader
->
status
.
pTableMap
);
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pReader
->
pReadSnap
->
fs
.
aDFileSet
,
pReader
);
resetDataBlockIterator
(
&
pReader
->
status
.
b
lockIter
,
pReader
->
order
);
resetDataBlockIterator
(
pB
lockIter
,
pReader
->
order
);
int64_t
ts
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
pReader
->
window
.
skey
-
1
:
pReader
->
window
.
ekey
+
1
;
resetDataBlockScanInfo
(
pReader
->
status
.
pTableMap
,
ts
);
reset
All
DataBlockScanInfo
(
pReader
->
status
.
pTableMap
,
ts
);
int32_t
code
=
0
;
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
int32_t
code
=
0
;
// no data in files, let's try buffer in memory
if
(
pReader
->
status
.
fileIter
.
numOfFiles
==
0
)
{
...
...
@@ -4071,7 +4146,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
pStatus
->
pTableIter
=
taosHashIterate
(
pStatus
->
pTableMap
,
NULL
);
while
(
pStatus
->
pTableIter
!=
NULL
)
{
STableBlockScanInfo
*
pBlockScanInfo
=
pStatus
->
pTableIter
;
STableBlockScanInfo
*
pBlockScanInfo
=
*
(
STableBlockScanInfo
**
)
pStatus
->
pTableIter
;
STbData
*
d
=
NULL
;
if
(
pReader
->
pTsdb
->
mem
!=
NULL
)
{
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
21748def
...
...
@@ -414,9 +414,11 @@ _exit:
}
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
)
{
SSyncState
state
=
syncGetState
(
pVnode
->
sync
);
pLoad
->
vgId
=
TD_VID
(
pVnode
);
pLoad
->
syncState
=
s
yncGetMyRole
(
pVnode
->
sync
)
;
pLoad
->
syncRestore
=
pVnode
->
restored
;
pLoad
->
syncState
=
s
tate
.
state
;
pLoad
->
syncRestore
=
state
.
restored
;
pLoad
->
cacheUsage
=
tsdbCacheGetUsage
(
pVnode
);
pLoad
->
numOfTables
=
metaGetTbNum
(
pVnode
->
pMeta
);
pLoad
->
numOfTimeSeries
=
metaGetTimeSeriesNum
(
pVnode
->
pMeta
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
21748def
...
...
@@ -309,13 +309,13 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vGTrace
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
syncGetVgId
(
pVnode
->
sync
)
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
sync
UtilState2String
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
sync
Str
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
}
else
{
SRpcMsg
rsp
=
{.
code
=
pMeta
->
code
,
.
info
=
pMsg
->
info
};
vError
(
"vgId:%d, commit-cb execute error, type:%s, index:%"
PRId64
", error:0x%x %s"
,
syncGetVgId
(
pVnode
->
sync
)
,
vError
(
"vgId:%d, commit-cb execute error, type:%s, index:%"
PRId64
", error:0x%x %s"
,
pVnode
->
config
.
vgId
,
TMSG_INFO
(
pMsg
->
msgType
),
pMeta
->
index
,
pMeta
->
code
,
tstrerror
(
pMeta
->
code
));
if
(
rsp
.
info
.
handle
!=
NULL
)
{
tmsgSendRsp
(
&
rsp
);
...
...
@@ -338,8 +338,8 @@ static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, con
static
void
vnodeSyncRollBackMsg
(
const
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
*
pMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
vTrace
(
"vgId:%d, rollback-cb is excuted, fsm:%p, index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
syncGetVgId
(
pVnode
->
sync
),
pFsm
,
pMeta
->
index
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncUtilState2String
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
)
,
TMSG_INFO
(
pMsg
->
msgType
));
}
#define USE_TSDB_SNAPSHOT
...
...
@@ -552,12 +552,21 @@ void vnodeSyncClose(SVnode *pVnode) {
syncStop
(
pVnode
->
sync
);
}
bool
vnodeIsRoleLeader
(
SVnode
*
pVnode
)
{
return
syncGetMyRole
(
pVnode
->
sync
)
==
TAOS_SYNC_STATE_LEADER
;
}
bool
vnodeIsRoleLeader
(
SVnode
*
pVnode
)
{
SSyncState
state
=
syncGetState
(
pVnode
->
sync
);
return
state
.
state
==
TAOS_SYNC_STATE_LEADER
;
}
bool
vnodeIsLeader
(
SVnode
*
pVnode
)
{
if
(
!
syncIsReady
(
pVnode
->
sync
))
{
vDebug
(
"vgId:%d, vnode not ready, state:%s, restore:%d"
,
pVnode
->
config
.
vgId
,
syncGetMyRoleStr
(
pVnode
->
sync
),
syncRestoreFinish
(
pVnode
->
sync
));
SSyncState
state
=
syncGetState
(
pVnode
->
sync
);
if
(
state
.
state
!=
TAOS_SYNC_STATE_LEADER
||
!
state
.
restored
)
{
if
(
state
.
state
!=
TAOS_SYNC_STATE_LEADER
)
{
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
}
else
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
vDebug
(
"vgId:%d, vnode not ready, state:%s, restore:%d"
,
pVnode
->
config
.
vgId
,
syncStr
(
state
.
state
),
state
.
restored
);
return
false
;
}
...
...
source/libs/sync/inc/syncEnv.h
浏览文件 @
21748def
...
...
@@ -52,6 +52,7 @@ typedef struct SSyncEnv {
}
SSyncEnv
;
SSyncEnv
*
syncEnv
();
bool
syncIsInit
();
int64_t
syncNodeAdd
(
SSyncNode
*
pNode
);
void
syncNodeRemove
(
int64_t
rid
);
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
21748def
...
...
@@ -22,9 +22,9 @@ extern "C" {
#include "sync.h"
#include "syncTools.h"
#include "tlog.h"
#include "ttimer.h"
#include "taosdef.h"
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
// clang-format off
...
...
include/libs/sy
nc/syncTools.h
→
source/libs/sync/i
nc/syncTools.h
浏览文件 @
21748def
...
...
@@ -20,15 +20,13 @@
extern
"C"
{
#endif
#include "trpc.h"
// ------------------ ds -------------------
typedef
struct
SRaftId
{
SyncNodeId
addr
;
SyncGroupId
vgId
;
}
SRaftId
;
char
*
sync2SimpleStr
(
int64_t
rid
);
char
*
sync2SimpleStr
(
int64_t
rid
);
// for compatibility, the same as syncPropose
int32_t
syncForwardToPeer
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
...
...
source/libs/sync/inc/syncUtil.h
浏览文件 @
21748def
...
...
@@ -49,7 +49,7 @@ int32_t syncUtilQuorum(int32_t replicaNum);
cJSON
*
syncUtilNodeInfo2Json
(
const
SNodeInfo
*
p
);
cJSON
*
syncUtilRaftId2Json
(
const
SRaftId
*
p
);
char
*
syncUtilRaftId2Str
(
const
SRaftId
*
p
);
const
char
*
sync
UtilState2String
(
ESyncState
state
);
const
char
*
sync
Str
(
ESyncState
state
);
bool
syncUtilCanPrint
(
char
c
);
char
*
syncUtilprintBin
(
char
*
ptr
,
uint32_t
len
);
char
*
syncUtilprintBin2
(
char
*
ptr
,
uint32_t
len
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
21748def
...
...
@@ -505,50 +505,20 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return
ret
;
}
ESyncState
syncGetMyRole
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_STATE_ERROR
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
ESyncState
state
=
pSyncNode
->
state
;
syncNodeRelease
(
pSyncNode
);
return
state
;
}
SSyncState
syncGetState
(
int64_t
rid
)
{
SSyncState
state
=
{.
state
=
TAOS_SYNC_STATE_ERROR
};
bool
syncIsReady
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
bool
b
=
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
&&
pSyncNode
->
restoreFinish
;
syncNodeRelease
(
pSyncNode
);
// if false, set error code
if
(
false
==
b
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
}
else
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
}
return
b
;
}
bool
syncIsRestoreFinish
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
return
false
;
if
(
pSyncNode
!=
NULL
)
{
state
.
state
=
pSyncNode
->
state
;
state
.
restored
=
pSyncNode
->
restoreFinish
;
syncNodeRelease
(
pSyncNode
);
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
bool
b
=
pSyncNode
->
restoreFinish
;
syncNodeRelease
(
pSyncNode
);
return
b
;
return
state
;
}
#if 0
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
if (index < SYNC_INDEX_BEGIN) {
return -1;
...
...
@@ -618,6 +588,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
syncNodeRelease(pSyncNode);
return 0;
}
#endif
SyncIndex
syncNodeGetSnapshotConfigIndex
(
SSyncNode
*
pSyncNode
,
SyncIndex
snapshotLastApplyIndex
)
{
ASSERT
(
pSyncNode
->
pRaftCfg
->
configIndexCount
>=
1
);
...
...
@@ -635,23 +606,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
return
lastIndex
;
}
const
char
*
syncGetMyRoleStr
(
int64_t
rid
)
{
const
char
*
s
=
syncUtilState2String
(
syncGetMyRole
(
rid
));
return
s
;
}
bool
syncRestoreFinish
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
bool
restoreFinish
=
pSyncNode
->
restoreFinish
;
syncNodeRelease
(
pSyncNode
);
return
restoreFinish
;
}
#if 0
SyncTerm syncGetMyTerm(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
...
...
@@ -719,6 +674,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode);
}
#endif
void
syncGetRetryEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
...
...
@@ -742,7 +698,6 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease
(
pSyncNode
);
}
static
void
syncGetAndDelRespRpc
(
SSyncNode
*
pSyncNode
,
uint64_t
index
,
SRpcHandleInfo
*
pInfo
)
{
SRespStub
stub
;
int32_t
ret
=
syncRespMgrGetAndDel
(
pSyncNode
->
pSyncRespMgr
,
index
,
&
stub
);
...
...
@@ -877,7 +832,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
sError
(
"vgId:%d, sync propose not leader, %s, type:%s"
,
pSyncNode
->
vgId
,
sync
UtilState2String
(
pSyncNode
->
state
),
sError
(
"vgId:%d, sync propose not leader, %s, type:%s"
,
pSyncNode
->
vgId
,
sync
Str
(
pSyncNode
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
goto
_END
;
}
...
...
@@ -1603,7 +1558,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// tla+ server vars
cJSON_AddNumberToObject
(
pRoot
,
"state"
,
pSyncNode
->
state
);
cJSON_AddStringToObject
(
pRoot
,
"state_str"
,
sync
UtilState2String
(
pSyncNode
->
state
));
cJSON_AddStringToObject
(
pRoot
,
"state_str"
,
sync
Str
(
pSyncNode
->
state
));
cJSON_AddItemToObject
(
pRoot
,
"pRaftStore"
,
raftStore2Json
(
pSyncNode
->
pRaftStore
));
// tla+ candidate vars
...
...
@@ -1743,7 +1698,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s"
,
pSyncNode
->
vgId
,
sync
UtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
sync
Str
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pSyncNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
...
...
@@ -1767,7 +1722,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s"
,
pSyncNode
->
vgId
,
sync
UtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
sync
Str
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pSyncNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
...
...
@@ -1821,7 +1776,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s"
,
pSyncNode
->
vgId
,
sync
UtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
sync
Str
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pSyncNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
...
...
@@ -1843,7 +1798,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s"
,
pSyncNode
->
vgId
,
sync
UtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
sync
Str
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pSyncNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
...
...
@@ -1875,9 +1830,9 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
", sby:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d"
,
pSyncNode
->
vgId
,
sync
UtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
);
pSyncNode
->
vgId
,
sync
Str
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
);
return
s
;
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
21748def
...
...
@@ -2413,7 +2413,7 @@ cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) {
cJSON_AddNumberToObject
(
pRoot
,
"fsmMeta.isWeak"
,
pMsg
->
fsmMeta
.
isWeak
);
cJSON_AddNumberToObject
(
pRoot
,
"fsmMeta.code"
,
pMsg
->
fsmMeta
.
code
);
cJSON_AddNumberToObject
(
pRoot
,
"fsmMeta.state"
,
pMsg
->
fsmMeta
.
state
);
cJSON_AddStringToObject
(
pRoot
,
"fsmMeta.state.str"
,
sync
UtilState2String
(
pMsg
->
fsmMeta
.
state
));
cJSON_AddStringToObject
(
pRoot
,
"fsmMeta.state.str"
,
sync
Str
(
pMsg
->
fsmMeta
.
state
));
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
fsmMeta
.
seqNum
);
cJSON_AddStringToObject
(
pRoot
,
"fsmMeta.seqNum"
,
u64buf
);
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
21748def
...
...
@@ -176,18 +176,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) {
return
serialized
;
}
const
char
*
syncUtilState2String
(
ESyncState
state
)
{
if
(
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
return
"follower"
;
}
else
if
(
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
return
"candidate"
;
}
else
if
(
state
==
TAOS_SYNC_STATE_LEADER
)
{
return
"leader"
;
}
else
{
return
"state_error"
;
}
}
bool
syncUtilCanPrint
(
char
c
)
{
if
(
c
>=
32
&&
c
<=
126
)
{
return
true
;
...
...
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
浏览文件 @
21748def
...
...
@@ -47,8 +47,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s, flag:%"
PRIu64
", term:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
)
,
cbMeta
.
flag
,
cbMeta
.
term
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
),
cbMeta
.
flag
,
cbMeta
.
term
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
else
{
sTrace
(
"==callback== ==CommitCb== do not apply again %"
PRId64
,
cbMeta
.
index
);
...
...
@@ -57,10 +57,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
void
PreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
cbMeta
.
flag
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncStr
(
cbMeta
.
state
),
cbMeta
.
flag
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -68,8 +68,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
cbMeta
.
flag
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncStr
(
cbMeta
.
state
),
cbMeta
.
flag
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
source/libs/sync/test/syncConfigChangeTest.cpp
浏览文件 @
21748def
...
...
@@ -45,8 +45,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
cbMeta
.
flag
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncStr
(
cbMeta
.
state
),
cbMeta
.
flag
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
else
{
sTrace
(
"==callback== ==CommitCb== do not apply again %"
PRId64
,
cbMeta
.
index
);
...
...
@@ -55,10 +54,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
void
PreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
cbMeta
.
flag
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncStr
(
cbMeta
.
state
),
cbMeta
.
flag
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -66,8 +65,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
cbMeta
.
flag
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncStr
(
cbMeta
.
state
),
cbMeta
.
flag
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
source/libs/sync/test/syncReplicateTest.cpp
浏览文件 @
21748def
...
...
@@ -42,7 +42,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
else
{
sTrace
(
"==callback== ==CommitCb== do not apply again %"
PRId64
,
cbMeta
.
index
);
...
...
@@ -53,7 +53,7 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta)
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -61,7 +61,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
source/libs/sync/test/syncSnapshotTest.cpp
浏览文件 @
21748def
...
...
@@ -45,7 +45,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
else
{
sTrace
(
"==callback== ==CommitCb== do not apply again %"
PRId64
,
cbMeta
.
index
);
...
...
@@ -56,7 +56,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta)
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -64,7 +64,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
source/libs/sync/test/syncTestTool.cpp
浏览文件 @
21748def
...
...
@@ -44,8 +44,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
", term:%"
PRIu64
" "
"currentTerm:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
)
,
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
),
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -56,8 +56,8 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta)
", term:%"
PRIu64
" "
"currentTerm:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
)
,
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
),
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -68,8 +68,8 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
", term:%"
PRIu64
" "
"currentTerm:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
)
,
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
),
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -168,8 +168,8 @@ void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbM
", isWeak:%d, code:%d, state:%d %s, flag:%"
PRIu64
", term:%"
PRIu64
" "
"currentTerm:%"
PRIu64
"
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
)
,
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
),
cbMeta
.
flag
,
cbMeta
.
term
,
cbMeta
.
currentTerm
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
source/libs/sync/test/syncWriteTest.cpp
浏览文件 @
21748def
...
...
@@ -35,7 +35,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -43,7 +43,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta)
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
@@ -51,7 +51,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
UtilState2String
(
cbMeta
.
state
));
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
sync
Str
(
cbMeta
.
state
));
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
...
...
tests/system-test/6-cluster/5dnode3mnodeAdd1Ddnoe.py
浏览文件 @
21748def
...
...
@@ -31,6 +31,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
...
...
@@ -118,6 +119,7 @@ class TDTestCase:
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
...
...
@@ -167,8 +169,8 @@ class TDTestCase:
threads
.
append
(
threading
.
Thread
(
target
=
clusterComCreate
.
insert_data
,
args
=
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])))
for
tr
in
threads
:
tr
.
start
()
dnode
6Port
=
int
(
6030
+
5
*
100
)
tdSql
.
execute
(
"create dnode '%s:%d'"
%
(
hostname
,
dnode
6
Port
))
dnode
7Port
=
int
(
6030
+
6
*
100
)
tdSql
.
execute
(
"create dnode '%s:%d'"
%
(
hostname
,
dnode
7
Port
))
clusterComCheck
.
checkDnodes
(
dnodeNumbers
)
for
tr
in
threads
:
tr
.
join
()
...
...
@@ -219,7 +221,7 @@ class TDTestCase:
tdSql
.
checkRows
(
rowsPerStb
)
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'dnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertData.py
浏览文件 @
21748def
...
...
@@ -31,6 +31,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
...
...
@@ -118,6 +119,7 @@ class TDTestCase:
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
...
...
@@ -157,7 +159,7 @@ class TDTestCase:
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
clusterComCreate
.
create_ctable
(
newTdSql
,
paraDict
[
"dbName"
],
stableName
,
stableName
,
paraDict
[
'ctbNum'
])
#insert dat
e
#insert dat
a
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
=
tdCom
.
newTdSql
()
...
...
@@ -203,17 +205,19 @@ class TDTestCase:
clusterComCheck
.
checkDbRows
(
dbNumbers
)
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
tdSql
.
execute
(
"use %s"
%
(
paraDict
[
"dbName"
]))
tdSql
.
query
(
"show stables"
)
tdSql
.
checkRows
(
paraDict
[
"stbNumbers"
])
# for i in range(paraDict['stbNumbers']):
# stableName= '%s_%d'%(paraDict['stbName'],i)
# tdSql.query("select * from %s"%stableName)
# tdSql.checkRows(rowsPerStb)
newTdSql
=
tdCom
.
newTdSql
()
newTdSql
.
execute
(
"reset query cache"
)
newTdSql
.
execute
(
"use %s"
%
(
paraDict
[
"dbName"
]))
newTdSql
.
query
(
"show %s.stables"
%
(
paraDict
[
"dbName"
]))
newTdSql
.
checkRows
(
paraDict
[
"stbNumbers"
])
for
i
in
range
(
paraDict
[
'stbNumbers'
]):
stableName
=
'%s_%d'
%
(
paraDict
[
'stbName'
],
i
)
newTdSql
.
query
(
"select * from %s"
%
stableName
)
newTdSql
.
checkRows
(
rowsPerStb
)
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
1
,
stopRole
=
'dnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py
浏览文件 @
21748def
...
...
@@ -31,6 +31,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
...
...
@@ -118,6 +119,7 @@ class TDTestCase:
rowsPerStb
=
paraDict
[
"ctbNum"
]
*
paraDict
[
"rowsPerTbl"
]
rowsall
=
rowsPerStb
*
paraDict
[
'stbNumbers'
]
dbNumbers
=
1
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
...
...
@@ -214,7 +216,7 @@ class TDTestCase:
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
1
,
stopRole
=
'dnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
1
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
浏览文件 @
21748def
...
...
@@ -30,7 +30,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
...
...
@@ -113,6 +113,7 @@ class TDTestCase:
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allDbNumbers
=
(
paraDict
[
'dbNumbers'
]
*
restartNumbers
)
allStbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
restartNumbers
)
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
...
...
@@ -198,7 +199,7 @@ class TDTestCase:
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
10
,
stopRole
=
'dnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
10
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py
浏览文件 @
21748def
...
...
@@ -30,7 +30,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
...
...
@@ -87,6 +87,7 @@ class TDTestCase:
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allStbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
restartNumbers
)
dbNumbers
=
1
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
...
...
@@ -171,7 +172,7 @@ class TDTestCase:
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'dnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'dnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
浏览文件 @
21748def
...
...
@@ -30,7 +30,8 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
replicaVar
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
...
...
@@ -87,7 +88,8 @@ class TDTestCase:
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allDbNumbers
=
(
paraDict
[
'dbNumbers'
]
*
restartNumbers
)
allStbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
restartNumbers
)
paraDict
[
'replica'
]
=
int
(
self
.
replicaVar
)
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
...
...
@@ -171,7 +173,7 @@ class TDTestCase:
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
1
,
stopRole
=
'mnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
1
,
stopRole
=
'mnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py
浏览文件 @
21748def
...
...
@@ -30,7 +30,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
...
...
@@ -112,6 +112,7 @@ class TDTestCase:
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allStbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
restartNumbers
)
dbNumbers
=
1
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
...
...
@@ -197,7 +198,7 @@ class TDTestCase:
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'mnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'mnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
浏览文件 @
21748def
...
...
@@ -30,6 +30,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
...
...
@@ -88,6 +89,7 @@ class TDTestCase:
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allDbNumbers
=
(
paraDict
[
'dbNumbers'
]
*
restartNumbers
)
allStbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
restartNumbers
)
paraDict
[
'replica'
]
=
self
.
replicaVar
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
...
...
@@ -171,7 +173,7 @@ class TDTestCase:
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
10
,
stopRole
=
'vnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
10
,
stopRole
=
'vnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py
浏览文件 @
21748def
...
...
@@ -30,7 +30,7 @@ class TDTestCase:
self
.
TDDnodes
=
None
tdSql
.
init
(
conn
.
cursor
())
self
.
host
=
socket
.
gethostname
()
print
(
tdSql
)
self
.
replicaVar
=
int
(
replicaVar
)
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
...
...
@@ -112,8 +112,8 @@ class TDTestCase:
vnodeNumbers
=
int
(
dnodeNumbers
-
mnodeNums
)
allStbNumbers
=
(
paraDict
[
'stbNumbers'
]
*
restartNumbers
)
dbNumbers
=
1
paraDict
[
'replica'
]
=
self
.
replicaVar
print
(
tdSql
)
tdLog
.
info
(
"first check dnode and mnode"
)
tdSql
.
query
(
"select * from information_schema.ins_dnodes;"
)
tdSql
.
checkData
(
0
,
1
,
'%s:6030'
%
self
.
host
)
...
...
@@ -175,7 +175,7 @@ class TDTestCase:
# dnodeNumbers don't include database of schema
if
clusterComCheck
.
checkDnodes
(
dnodeNumbers
):
tdLog
.
info
(
"
123
"
)
tdLog
.
info
(
"
check numbers of dnodes right
"
)
else
:
print
(
"456"
)
...
...
@@ -192,14 +192,15 @@ class TDTestCase:
tdSql
.
execute
(
"use %s"
%
(
paraDict
[
"dbName"
]))
tdSql
.
query
(
"show stables"
)
tdLog
.
debug
(
"we find %d stables but exepect to create %d stables "
%
(
tdSql
.
queryRows
,
allStbNumbers
))
# # tdLog.info("check Stable Rows:")
tdSql
.
checkRows
(
allStbNumbers
)
if
self
.
replicaVar
==
1
:
tdSql
.
checkRows
(
allStbNumbers
)
else
:
tdLog
.
debug
(
"we find %d stables but exepect to create %d stables "
%
(
tdSql
.
queryRows
,
allStbNumbers
))
def
run
(
self
):
# print(self.master_dnode.cfgDict)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
5
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'vnode'
)
self
.
fiveDnodeThreeMnode
(
dnodeNumbers
=
6
,
mnodeNums
=
3
,
restartNumbers
=
2
,
stopRole
=
'vnode'
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/6-cluster/clusterCommonCheck.py
浏览文件 @
21748def
...
...
@@ -47,13 +47,14 @@ class ClusterComCheck:
for
i
in
range
(
dnodeNumbers
):
if
tdSql
.
queryResult
[
i
][
4
]
==
"ready"
:
status
+=
1
tdLog
.
info
(
status
)
#
tdLog.info(status)
if
status
==
dnodeNumbers
:
tdLog
.
success
(
"it find cluster with %d dnodes and check that all cluster dnodes are ready within %ds! "
%
(
dnodeNumbers
,
count
))
tdLog
.
success
(
"it find cluster with %d dnodes and check that all cluster dnodes are ready within %ds! "
%
(
dnodeNumbers
,
count
+
1
))
return
True
count
+=
1
time
.
sleep
(
1
)
count
+=
1
else
:
tdSql
.
query
(
"select * from information_schema.ins_dnodes"
)
tdLog
.
debug
(
tdSql
.
queryResult
)
...
...
@@ -74,10 +75,10 @@ class ClusterComCheck:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
exit
(
"we find %d databases but expect %d in clusters! "
%
(
tdSql
.
queryRows
,
dbNumbers
))
def
checkDb
(
self
,
dbNumbers
,
restartNumber
,
dbNameIndex
):
def
checkDb
(
self
,
dbNumbers
,
restartNumber
,
dbNameIndex
,
timeout
=
100
):
count
=
0
alldbNumbers
=
(
dbNumbers
*
restartNumber
)
+
2
while
count
<
5
:
while
count
<
timeout
:
query_status
=
0
for
j
in
range
(
dbNumbers
):
for
i
in
range
(
alldbNumbers
):
...
...
@@ -87,22 +88,24 @@ class ClusterComCheck:
query_status
+=
1
tdLog
.
debug
(
"check %s_%d that status is ready "
%
(
dbNameIndex
,
j
))
else
:
sleep
(
1
)
continue
# print(query_status)
count
+=
1
if
query_status
==
dbNumbers
:
tdLog
.
success
(
" check %d database and all databases are ready within
5s! "
%
dbNumbers
)
tdLog
.
success
(
" check %d database and all databases are ready within
%ds! "
%
(
dbNumbers
,
count
+
1
)
)
return
True
count
+=
1
else
:
tdLog
.
debug
(
tdSql
.
queryResult
)
tdLog
.
debug
(
"query status is %d"
%
query_status
)
tdLog
.
exit
(
"database is not ready within
5s"
)
tdLog
.
exit
(
"database is not ready within
%ds"
%
(
timeout
+
1
)
)
def
checkData
(
self
,
dbname
,
stbname
,
stableCount
,
CtableCount
,
rowsPerSTable
,):
tdSql
.
execute
(
"use %s"
%
dbname
)
tdSql
.
query
(
"show
stables"
)
tdSql
.
query
(
"show
%s.stables"
%
dbname
)
tdSql
.
checkRows
(
stableCount
)
tdSql
.
query
(
"show
tables"
)
tdSql
.
query
(
"show
%s.tables"
%
dbname
)
tdSql
.
checkRows
(
CtableCount
)
for
i
in
range
(
stableCount
):
tdSql
.
query
(
"select count(*) from %s%d"
%
(
stbname
,
i
))
...
...
tests/system-test/fulltest.sh
浏览文件 @
21748def
...
...
@@ -236,22 +236,29 @@ python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStop.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStop2Follower.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeStopLoop.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRestartDnodeInsertData.py
-N
5
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py
-N
5
-M
3
# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartMnodeInsertData.py -N 5 -M 3
# python3 ./test.py -f 6-cluster/5dnode3mnodeRestartVnodeInsertData.py -N 5 -M 3
python3 ./test.py
-f
6-cluster/5dnode3mnodeAdd1Ddnoe.py
-N
6
-M
3
-C
5
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRestartDnodeInsertData.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRestartDnodeInsertData.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py
-N
6
-M
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py
-N
6
-M
3
-n
3
python3 ./test.py
-f
6-cluster/5dnode3mnodeAdd1Ddnoe.py
-N
7
-M
3
-C
6
python3 ./test.py
-f
6-cluster/5dnode3mnodeAdd1Ddnoe.py
-N
7
-M
3
-C
6
-n
3
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
python3 ./test.py
-f
6-cluster/5dnode3mnodeDrop.py
-N
5
# TD-19646 python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录