Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a46e3a9e
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
a46e3a9e
编写于
12月 19, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
12月 19, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9194 from taosdata/feature/dnode3
Feature/dnode3
上级
e0e9eca7
4681611e
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
634 addition
and
112 deletion
+634
-112
include/common/taosmsg.h
include/common/taosmsg.h
+13
-8
include/util/taoserror.h
include/util/taoserror.h
+12
-11
include/util/tdef.h
include/util/tdef.h
+1
-1
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+52
-36
source/dnode/mgmt/impl/test/CMakeLists.txt
source/dnode/mgmt/impl/test/CMakeLists.txt
+1
-1
source/dnode/mgmt/impl/test/db/db.cpp
source/dnode/mgmt/impl/test/db/db.cpp
+1
-0
source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt
source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt
+27
-0
source/dnode/mgmt/impl/test/vgroup/vgroup.cpp
source/dnode/mgmt/impl/test/vgroup/vgroup.cpp
+224
-0
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+2
-2
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+3
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+51
-1
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+6
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+162
-46
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+78
-3
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/taosmsg.h
浏览文件 @
a46e3a9e
...
@@ -750,31 +750,36 @@ typedef struct {
...
@@ -750,31 +750,36 @@ typedef struct {
}
SReplica
;
}
SReplica
;
typedef
struct
{
typedef
struct
{
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
vgId
;
int32_t
vgId
;
int32_t
dnodeId
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
uint64_t
dbUid
;
int32_t
cacheBlockSize
;
int32_t
cacheBlockSize
;
int32_t
totalBlocks
;
int32_t
totalBlocks
;
int32_t
daysPerFile
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
minRows
;
int32_t
maxRowsPerFileBlock
;
int32_t
maxRows
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int32_t
fsyncPeriod
;
int8_t
reserved
[
16
]
;
int8_t
walLevel
;
int8_t
precision
;
int8_t
precision
;
int8_t
compression
;
int8_t
compression
;
int8_t
cacheLastRow
;
int8_t
update
;
int8_t
walLevel
;
int8_t
quorum
;
int8_t
quorum
;
int8_t
update
;
int8_t
cacheLastRow
;
int8_t
replica
;
int8_t
replica
;
int8_t
selfIndex
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SCreateVnodeMsg
,
SAlterVnodeMsg
;
}
SCreateVnodeMsg
,
SAlterVnodeMsg
;
typedef
struct
{
typedef
struct
{
int32_t
vgId
;
int32_t
vgId
;
int32_t
dnodeId
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
uint64_t
dbUid
;
}
SDropVnodeMsg
,
SSyncVnodeMsg
,
SCompactVnodeMsg
;
}
SDropVnodeMsg
,
SSyncVnodeMsg
,
SCompactVnodeMsg
;
typedef
struct
{
typedef
struct
{
...
...
include/util/taoserror.h
浏览文件 @
a46e3a9e
...
@@ -120,17 +120,18 @@ int32_t* taosGetErrno();
...
@@ -120,17 +120,18 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input")
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input")
// mnode-common
// mnode-common
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030A)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x030A)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030B)
// mnode-show
// mnode-show
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310)
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310)
...
...
include/util/tdef.h
浏览文件 @
a46e3a9e
...
@@ -231,7 +231,7 @@ do { \
...
@@ -231,7 +231,7 @@ do { \
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
#define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES
64
#define TSDB_MIN_VNODES
16
#define TSDB_MAX_VNODES 512
#define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MAX_VNODES_PER_DB 4096
#define TSDB_MAX_VNODES_PER_DB 4096
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
a46e3a9e
...
@@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
...
@@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
)
{
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodeObj
*
pVnode
=
NULL
;
SVnodeObj
*
pVnode
=
NULL
;
int32_t
refCount
=
0
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
taosRLockLatch
(
&
pMgmt
->
latch
);
...
@@ -107,23 +107,23 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
...
@@ -107,23 +107,23 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
}
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
if
(
pVnode
!=
NULL
)
{
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
}
return
pVnode
;
return
pVnode
;
}
}
static
void
dndReleaseVnode
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
)
{
static
void
dndReleaseVnode
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
)
{
if
(
pVnode
==
NULL
)
return
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
taosRLockLatch
(
&
pMgmt
->
latch
);
if
(
pVnode
!=
NULL
)
{
int32_t
refCount
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
refCount
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
pVnode
!=
NULL
)
{
dTrace
(
"vgId:%d, release vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
dTrace
(
"vgId:%d, release vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
}
}
}
static
int32_t
dndCreateVnodeWrapper
(
SDnode
*
pDnode
,
int32_t
vgId
,
char
*
path
,
SVnode
*
pImpl
)
{
static
int32_t
dndCreateVnodeWrapper
(
SDnode
*
pDnode
,
int32_t
vgId
,
char
*
path
,
SVnode
*
pImpl
)
{
...
@@ -457,7 +457,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
...
@@ -457,7 +457,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt
->
totalVnodes
=
numOfVnodes
;
pMgmt
->
totalVnodes
=
numOfVnodes
;
int32_t
threadNum
=
tsN
umOfCores
;
int32_t
threadNum
=
pDnode
->
opt
.
n
umOfCores
;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
calloc
(
threadNum
,
sizeof
(
SVnodeThread
));
SVnodeThread
*
threads
=
calloc
(
threadNum
,
sizeof
(
SVnodeThread
));
...
@@ -525,33 +525,49 @@ static void dndCloseVnodes(SDnode *pDnode) {
...
@@ -525,33 +525,49 @@ static void dndCloseVnodes(SDnode *pDnode) {
static
int32_t
dndParseCreateVnodeReq
(
SRpcMsg
*
rpcMsg
,
int32_t
*
vgId
,
SVnodeCfg
*
pCfg
)
{
static
int32_t
dndParseCreateVnodeReq
(
SRpcMsg
*
rpcMsg
,
int32_t
*
vgId
,
SVnodeCfg
*
pCfg
)
{
SCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
SCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
*
vgId
=
htonl
(
pCreate
->
vgId
);
pCreate
->
vgId
=
htonl
(
pCreate
->
vgId
);
pCreate
->
dnodeId
=
htonl
(
pCreate
->
dnodeId
);
pCreate
->
dbUid
=
htobe64
(
pCreate
->
dbUid
);
pCreate
->
cacheBlockSize
=
htonl
(
pCreate
->
cacheBlockSize
);
pCreate
->
totalBlocks
=
htonl
(
pCreate
->
totalBlocks
);
pCreate
->
daysPerFile
=
htonl
(
pCreate
->
daysPerFile
);
pCreate
->
daysToKeep0
=
htonl
(
pCreate
->
daysToKeep0
);
pCreate
->
daysToKeep1
=
htonl
(
pCreate
->
daysToKeep1
);
pCreate
->
daysToKeep2
=
htonl
(
pCreate
->
daysToKeep2
);
pCreate
->
minRows
=
htonl
(
pCreate
->
minRows
);
pCreate
->
maxRows
=
htonl
(
pCreate
->
maxRows
);
pCreate
->
commitTime
=
htonl
(
pCreate
->
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pCreate
->
fsyncPeriod
);
for
(
int
r
=
0
;
r
<
pCreate
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pCreate
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
pReplica
->
id
);
pReplica
->
port
=
htons
(
pReplica
->
port
);
}
*
vgId
=
pCreate
->
vgId
;
#if 0
#if 0
tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
pCfg->wsize = pCreate->cacheBlockSize;
pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCfg->ssize = pCreate->cacheBlockSize;
pCfg->totalBlocks = htonl(pCreate->totalBlocks);
pCfg->wsize = pCreate->cacheBlockSize;
pCfg->daysPerFile = htonl(pCreate->daysPerFile);
pCfg->lsize = pCreate->cacheBlockSize;
pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0);
pCfg->isHeapAllocator = true;
pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCfg->ttl = 4;
pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCfg->keep = pCreate->daysToKeep0;
pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCfg->isWeak = true;
pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
pCfg->precision = pCreate->precision;
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
pCfg->compression = pCreate->compression;
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
pCfg->cacheLastRow = pCreate->cacheLastRow;
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
pCfg->update = pCreate->update;
pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
pCfg->quorum = pCreate->quorum;
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
pCfg->replica = pCreate->replica;
pCfg->walCfg.level = pCreate->walLevel;
pCfg->walLevel = pCreate->walLevel;
pCfg->walCfg.retentionPeriod = 10;
pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod);
pCfg->walCfg.retentionSize = 128;
pCfg->walCfg.rollPeriod = 128;
for (int32_t i = 0; i < pCfg->replica; ++i) {
pCfg->walCfg.segSize = 128;
pCfg->replicas[i].port = htons(pCreate->replicas[i].port);
pCfg->walCfg.vgId = pCreate->vgId;
tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
}
#endif
#endif
return
0
;
return
0
;
}
}
...
@@ -1016,7 +1032,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
...
@@ -1016,7 +1032,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SMWorkerPool
*
pPool
=
&
pMgmt
->
writePool
;
SMWorkerPool
*
pPool
=
&
pMgmt
->
writePool
;
pPool
->
name
=
"vnode-write"
;
pPool
->
name
=
"vnode-write"
;
pPool
->
max
=
tsN
umOfCores
;
pPool
->
max
=
pDnode
->
opt
.
n
umOfCores
;
if
(
tMWorkerInit
(
pPool
)
!=
0
)
{
if
(
tMWorkerInit
(
pPool
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
...
@@ -1050,7 +1066,7 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
...
@@ -1050,7 +1066,7 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
}
}
static
int32_t
dndInitVnodeSyncWorker
(
SDnode
*
pDnode
)
{
static
int32_t
dndInitVnodeSyncWorker
(
SDnode
*
pDnode
)
{
int32_t
maxThreads
=
tsN
umOfCores
/
2
;
int32_t
maxThreads
=
pDnode
->
opt
.
n
umOfCores
/
2
;
if
(
maxThreads
<
1
)
maxThreads
=
1
;
if
(
maxThreads
<
1
)
maxThreads
=
1
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
...
...
source/dnode/mgmt/impl/test/CMakeLists.txt
浏览文件 @
a46e3a9e
...
@@ -15,6 +15,6 @@ add_subdirectory(stb)
...
@@ -15,6 +15,6 @@ add_subdirectory(stb)
# add_subdirectory(telem)
# add_subdirectory(telem)
# add_subdirectory(trans)
# add_subdirectory(trans)
add_subdirectory
(
user
)
add_subdirectory
(
user
)
#
add_subdirectory(vgroup)
add_subdirectory
(
vgroup
)
# add_subdirectory(common)
# add_subdirectory(common)
source/dnode/mgmt/impl/test/db/db.cpp
浏览文件 @
a46e3a9e
...
@@ -232,6 +232,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
...
@@ -232,6 +232,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
// taosMsleep(1000000);
}
}
SendTheCheckShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
"show databases"
,
17
,
NULL
);
SendTheCheckShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
"show databases"
,
17
,
NULL
);
...
...
source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt
0 → 100644
浏览文件 @
a46e3a9e
add_executable
(
dnode_test_vgroup
""
)
target_sources
(
dnode_test_vgroup
PRIVATE
"vgroup.cpp"
"../sut/deploy.cpp"
)
target_link_libraries
(
dnode_test_vgroup
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories
(
dnode_test_vgroup
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/server/dnode/mgmt"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../../inc"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../sut"
)
add_test
(
NAME dnode_test_vgroup
COMMAND dnode_test_vgroup
)
source/dnode/mgmt/impl/test/vgroup/vgroup.cpp
0 → 100644
浏览文件 @
a46e3a9e
/**
* @file db.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module vgroup-msg tests
* @version 0.1
* @date 2021-12-20
*
* @copyright Copyright (c) 2021
*
*/
#include "deploy.h"
class
DndTestVgroup
:
public
::
testing
::
Test
{
protected:
static
SServer
*
CreateServer
(
const
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
,
const
char
*
firstEp
)
{
SServer
*
pServer
=
createServer
(
path
,
fqdn
,
port
,
firstEp
);
ASSERT
(
pServer
);
return
pServer
;
}
static
void
SetUpTestSuite
()
{
initLog
(
"/tmp/tdlog"
);
const
char
*
fqdn
=
"localhost"
;
const
char
*
firstEp
=
"localhost:9150"
;
pServer
=
CreateServer
(
"/tmp/dnode_test_vgroup"
,
fqdn
,
9150
,
firstEp
);
pClient
=
createClient
(
"root"
,
"taosdata"
,
fqdn
,
9150
);
taosMsleep
(
1100
);
}
static
void
TearDownTestSuite
()
{
stopServer
(
pServer
);
dropClient
(
pClient
);
pServer
=
NULL
;
pClient
=
NULL
;
}
static
SServer
*
pServer
;
static
SClient
*
pClient
;
static
int32_t
connId
;
public:
void
SetUp
()
override
{}
void
TearDown
()
override
{}
void
SendTheCheckShowMetaMsg
(
int8_t
showType
,
const
char
*
showName
,
int32_t
columns
,
const
char
*
db
)
{
SShowMsg
*
pShow
=
(
SShowMsg
*
)
rpcMallocCont
(
sizeof
(
SShowMsg
));
pShow
->
type
=
showType
;
if
(
db
!=
NULL
)
{
strcpy
(
pShow
->
db
,
db
);
}
SRpcMsg
showRpcMsg
=
{
0
};
showRpcMsg
.
pCont
=
pShow
;
showRpcMsg
.
contLen
=
sizeof
(
SShowMsg
);
showRpcMsg
.
msgType
=
TSDB_MSG_TYPE_SHOW
;
sendMsg
(
pClient
,
&
showRpcMsg
);
ASSERT_NE
(
pClient
->
pRsp
,
nullptr
);
ASSERT_EQ
(
pClient
->
pRsp
->
code
,
0
);
ASSERT_NE
(
pClient
->
pRsp
->
pCont
,
nullptr
);
SShowRsp
*
pShowRsp
=
(
SShowRsp
*
)
pClient
->
pRsp
->
pCont
;
ASSERT_NE
(
pShowRsp
,
nullptr
);
pShowRsp
->
showId
=
htonl
(
pShowRsp
->
showId
);
pMeta
=
&
pShowRsp
->
tableMeta
;
pMeta
->
numOfTags
=
htonl
(
pMeta
->
numOfTags
);
pMeta
->
numOfColumns
=
htonl
(
pMeta
->
numOfColumns
);
pMeta
->
sversion
=
htonl
(
pMeta
->
sversion
);
pMeta
->
tversion
=
htonl
(
pMeta
->
tversion
);
pMeta
->
tuid
=
htobe64
(
pMeta
->
tuid
);
pMeta
->
suid
=
htobe64
(
pMeta
->
suid
);
showId
=
pShowRsp
->
showId
;
EXPECT_NE
(
pShowRsp
->
showId
,
0
);
EXPECT_STREQ
(
pMeta
->
tbFname
,
showName
);
EXPECT_EQ
(
pMeta
->
numOfTags
,
0
);
EXPECT_EQ
(
pMeta
->
numOfColumns
,
columns
);
EXPECT_EQ
(
pMeta
->
precision
,
0
);
EXPECT_EQ
(
pMeta
->
tableType
,
0
);
EXPECT_EQ
(
pMeta
->
update
,
0
);
EXPECT_EQ
(
pMeta
->
sversion
,
0
);
EXPECT_EQ
(
pMeta
->
tversion
,
0
);
EXPECT_EQ
(
pMeta
->
tuid
,
0
);
EXPECT_EQ
(
pMeta
->
suid
,
0
);
}
void
CheckSchema
(
int32_t
index
,
int8_t
type
,
int32_t
bytes
,
const
char
*
name
)
{
SSchema
*
pSchema
=
&
pMeta
->
pSchema
[
index
];
pSchema
->
bytes
=
htonl
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
type
);
EXPECT_EQ
(
pSchema
->
bytes
,
bytes
);
EXPECT_STREQ
(
pSchema
->
name
,
name
);
}
void
SendThenCheckShowRetrieveMsg
(
int32_t
rows
)
{
SRetrieveTableMsg
*
pRetrieve
=
(
SRetrieveTableMsg
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableMsg
));
pRetrieve
->
showId
=
htonl
(
showId
);
pRetrieve
->
free
=
0
;
SRpcMsg
retrieveRpcMsg
=
{
0
};
retrieveRpcMsg
.
pCont
=
pRetrieve
;
retrieveRpcMsg
.
contLen
=
sizeof
(
SRetrieveTableMsg
);
retrieveRpcMsg
.
msgType
=
TSDB_MSG_TYPE_SHOW_RETRIEVE
;
sendMsg
(
pClient
,
&
retrieveRpcMsg
);
ASSERT_NE
(
pClient
->
pRsp
,
nullptr
);
ASSERT_EQ
(
pClient
->
pRsp
->
code
,
0
);
ASSERT_NE
(
pClient
->
pRsp
->
pCont
,
nullptr
);
pRetrieveRsp
=
(
SRetrieveTableRsp
*
)
pClient
->
pRsp
->
pCont
;
ASSERT_NE
(
pRetrieveRsp
,
nullptr
);
pRetrieveRsp
->
numOfRows
=
htonl
(
pRetrieveRsp
->
numOfRows
);
pRetrieveRsp
->
useconds
=
htobe64
(
pRetrieveRsp
->
useconds
);
pRetrieveRsp
->
compLen
=
htonl
(
pRetrieveRsp
->
compLen
);
EXPECT_EQ
(
pRetrieveRsp
->
numOfRows
,
rows
);
EXPECT_EQ
(
pRetrieveRsp
->
useconds
,
0
);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ
(
pRetrieveRsp
->
precision
,
TSDB_TIME_PRECISION_MILLI
);
EXPECT_EQ
(
pRetrieveRsp
->
compressed
,
0
);
EXPECT_EQ
(
pRetrieveRsp
->
compLen
,
0
);
pData
=
pRetrieveRsp
->
data
;
pos
=
0
;
}
void
CheckInt8
(
int8_t
val
)
{
int8_t
data
=
*
((
int8_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int8_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckInt16
(
int16_t
val
)
{
int16_t
data
=
*
((
int16_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int16_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckInt32
(
int32_t
val
)
{
int32_t
data
=
*
((
int32_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int32_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckInt64
(
int64_t
val
)
{
int64_t
data
=
*
((
int64_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int64_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckTimestamp
()
{
int64_t
data
=
*
((
int64_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int64_t
);
EXPECT_GT
(
data
,
0
);
}
void
CheckBinary
(
const
char
*
val
,
int32_t
len
)
{
pos
+=
sizeof
(
VarDataLenT
);
char
*
data
=
(
char
*
)(
pData
+
pos
);
pos
+=
len
;
EXPECT_STREQ
(
data
,
val
);
}
int32_t
showId
;
STableMetaMsg
*
pMeta
;
SRetrieveTableRsp
*
pRetrieveRsp
;
char
*
pData
;
int32_t
pos
;
};
SServer
*
DndTestVgroup
::
pServer
;
SClient
*
DndTestVgroup
::
pClient
;
int32_t
DndTestVgroup
::
connId
;
TEST_F
(
DndTestVgroup
,
01
_Create_Restart_Drop_Vnode
)
{
{
SCreateVnodeMsg
*
pReq
=
(
SCreateVnodeMsg
*
)
rpcMallocCont
(
sizeof
(
SCreateVnodeMsg
));
pReq
->
vgId
=
htonl
(
2
);
pReq
->
dnodeId
=
htonl
(
1
);
strcpy
(
pReq
->
db
,
"1.d1"
);
pReq
->
dbUid
=
htobe64
(
9527
);
pReq
->
cacheBlockSize
=
htonl
(
16
);
pReq
->
totalBlocks
=
htonl
(
10
);
pReq
->
daysPerFile
=
htonl
(
10
);
pReq
->
daysToKeep0
=
htonl
(
3650
);
pReq
->
daysToKeep1
=
htonl
(
3650
);
pReq
->
daysToKeep2
=
htonl
(
3650
);
pReq
->
minRows
=
htonl
(
100
);
pReq
->
minRows
=
htonl
(
4096
);
pReq
->
commitTime
=
htonl
(
3600
);
pReq
->
fsyncPeriod
=
htonl
(
3000
);
pReq
->
walLevel
=
1
;
pReq
->
precision
=
0
;
pReq
->
compression
=
2
;
pReq
->
replica
=
1
;
pReq
->
quorum
=
1
;
pReq
->
update
=
0
;
pReq
->
cacheLastRow
=
0
;
pReq
->
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
pReq
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
1
);
pReplica
->
port
=
htons
(
9150
);
}
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SCreateVnodeMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_CREATE_VNODE_IN
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
taosMsleep
(
1000000
);
}
}
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
a46e3a9e
...
@@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans);
...
@@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans);
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
char
*
mndTransStageStr
(
ETrnStage
stage
);
char
*
mndTransStageStr
(
ETrnStage
stage
);
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
a46e3a9e
...
@@ -28,7 +28,9 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
...
@@ -28,7 +28,9 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void
mndReleaseVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
);
void
mndReleaseVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
);
int32_t
mndAllocVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
int32_t
mndAllocVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
);
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
);
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
);
SCreateVnodeMsg
*
mndBuildCreateVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
SDropVnodeMsg
*
mndBuildDropVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
a46e3a9e
...
@@ -285,10 +285,60 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
...
@@ -285,10 +285,60 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
}
}
static
int32_t
mndSetRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
static
int32_t
mndSetRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
for
(
int
v
=
0
;
v
<
pDb
->
cfg
.
numOfVgroups
;
++
v
)
{
SVgObj
*
pVgroup
=
pVgroups
+
v
;
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
return
-
1
;
}
SEpSet
epset
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SCreateVnodeMsg
*
pMsg
=
mndBuildCreateVnodeMsg
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
if
(
pMsg
==
NULL
)
{
return
-
1
;
}
if
(
mndTransAppendRedoAction
(
pTrans
,
&
epset
,
TSDB_MSG_TYPE_ALTER_VNODE_IN
,
sizeof
(
SCreateVnodeMsg
),
pMsg
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
}
}
return
0
;
return
0
;
}
}
static
int32_t
mndSetUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
static
int32_t
mndSetUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
for
(
int
v
=
0
;
v
<
pDb
->
cfg
.
numOfVgroups
;
++
v
)
{
SVgObj
*
pVgroup
=
pVgroups
+
v
;
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
return
-
1
;
}
SEpSet
epset
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SDropVnodeMsg
*
pMsg
=
mndBuildDropVnodeMsg
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
if
(
pMsg
==
NULL
)
{
return
-
1
;
}
if
(
mndTransAppendUndoAction
(
pTrans
,
&
epset
,
TSDB_MSG_TYPE_DROP_VNODE_IN
,
sizeof
(
SDropVnodeMsg
),
pMsg
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
}
}
return
0
;
return
0
;
}
}
...
@@ -644,7 +694,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
...
@@ -644,7 +694,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
return
-
1
;
return
-
1
;
}
}
int32_t
contLen
=
sizeof
(
SUseDbRsp
)
+
pDb
->
cfg
.
numOfVgroups
*
sizeof
(
SVgroupInfo
);
int32_t
contLen
=
sizeof
(
SUseDbRsp
)
+
pDb
->
cfg
.
numOfVgroups
*
sizeof
(
SVgroupInfo
);
SUseDbRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
SUseDbRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
a46e3a9e
...
@@ -180,8 +180,12 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj
...
@@ -180,8 +180,12 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj
}
}
SDnodeObj
*
mndAcquireDnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
SDnodeObj
*
mndAcquireDnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
dnodeId
);
SDnodeObj
*
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
dnodeId
);
if
(
pDnode
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
}
return
pDnode
;
}
}
void
mndReleaseDnode
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
)
{
void
mndReleaseDnode
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
)
{
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
a46e3a9e
...
@@ -21,6 +21,13 @@
...
@@ -21,6 +21,13 @@
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
#define TSDB_TRN_RESERVE_SIZE 64
typedef
struct
{
SEpSet
epSet
;
int8_t
msgType
;
int32_t
contLen
;
void
*
pCont
;
}
STransAction
;
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
);
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
...
@@ -29,9 +36,12 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
...
@@ -29,9 +36,12 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
);
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
);
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
);
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
);
static
int32_t
mndTransAppendArray
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
int32_t
mndTransAppendLog
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
void
mndTransDropArray
(
SArray
*
pArray
);
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
);
static
int32_t
mndTransExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
void
mndTransDropLogs
(
SArray
*
pArray
);
static
void
mndTransDropActions
(
SArray
*
pArray
);
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
...
@@ -58,7 +68,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
...
@@ -58,7 +68,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
void
mndCleanupTrans
(
SMnode
*
pMnode
)
{}
void
mndCleanupTrans
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
)
{
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
)
{
int32_t
rawDataLen
=
16
*
sizeof
(
int32_t
)
+
TSDB_TRN_RESERVE_SIZE
;
int32_t
rawDataLen
=
sizeof
(
STrans
)
+
TSDB_TRN_RESERVE_SIZE
;
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
...
@@ -80,6 +90,16 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
...
@@ -80,6 +90,16 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
}
}
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
redoActions
,
i
);
rawDataLen
+=
(
sizeof
(
STransAction
)
+
pAction
->
contLen
);
}
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
undoActions
,
i
);
rawDataLen
+=
(
sizeof
(
STransAction
)
+
pAction
->
contLen
);
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
TSDB_TRANS_VER
,
rawDataLen
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
TSDB_TRANS_VER
,
rawDataLen
);
if
(
pRaw
==
NULL
)
{
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to alloc raw since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to alloc raw since %s"
,
pTrans
->
id
,
terrstr
());
...
@@ -116,6 +136,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
...
@@ -116,6 +136,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pTmp
,
len
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pTmp
,
len
)
}
}
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
redoActions
,
i
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
pAction
->
epSet
,
sizeof
(
SEpSet
));
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgType
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
contLen
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pAction
->
pCont
,
pAction
->
contLen
);
}
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
undoActions
,
i
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
pAction
->
epSet
,
sizeof
(
SEpSet
));
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgType
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
contLen
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pAction
->
pCont
,
pAction
->
contLen
);
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_TRN_RESERVE_SIZE
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_TRN_RESERVE_SIZE
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
mTrace
(
"trans:%d, encode to raw:%p, len:%d"
,
pTrans
->
id
,
pRaw
,
dataPos
);
mTrace
(
"trans:%d, encode to raw:%p, len:%d"
,
pTrans
->
id
,
pRaw
,
dataPos
);
...
@@ -147,8 +183,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
...
@@ -147,8 +183,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
@@ -175,42 +211,77 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
...
@@ -175,42 +211,77 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for
(
int32_t
i
=
0
;
i
<
redoLogNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
redoLogNum
;
++
i
)
{
int32_t
dataLen
=
0
;
int32_t
dataLen
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
char
*
pData
=
malloc
(
dataLen
);
char
*
pData
=
malloc
(
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoLogs
,
&
pData
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoLogs
,
&
pData
);
if
(
ret
==
NULL
)
{
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
goto
TRANS_DECODE_OVER
;
break
;
}
}
}
}
for
(
int32_t
i
=
0
;
i
<
undoLogNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
undoLogNum
;
++
i
)
{
int32_t
dataLen
=
0
;
int32_t
dataLen
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
char
*
pData
=
malloc
(
dataLen
);
char
*
pData
=
malloc
(
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
undoLogs
,
&
pData
);
void
*
ret
=
taosArrayPush
(
pTrans
->
undoLogs
,
&
pData
);
if
(
ret
==
NULL
)
{
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
goto
TRANS_DECODE_OVER
;
break
;
}
}
}
}
for
(
int32_t
i
=
0
;
i
<
commitLogNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
commitLogNum
;
++
i
)
{
int32_t
dataLen
=
0
;
int32_t
dataLen
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
char
*
pData
=
malloc
(
dataLen
);
char
*
pData
=
malloc
(
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
commitLogs
,
&
pData
);
void
*
ret
=
taosArrayPush
(
pTrans
->
commitLogs
,
&
pData
);
if
(
ret
==
NULL
)
{
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
goto
TRANS_DECODE_OVER
;
break
;
}
}
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
STransAction
action
=
{
0
};
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
));
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
action
.
msgType
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
action
.
contLen
)
action
.
pCont
=
malloc
(
action
.
contLen
);
if
(
action
.
pCont
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
action
.
pCont
,
action
.
contLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoActions
,
&
action
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
}
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
STransAction
action
=
{
0
};
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
));
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
action
.
msgType
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
action
.
contLen
)
action
.
pCont
=
malloc
(
action
.
contLen
);
if
(
action
.
pCont
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
action
.
pCont
,
action
.
contLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
undoActions
,
&
action
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
}
}
}
...
@@ -237,11 +308,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
...
@@ -237,11 +308,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, perform delete action, stage:%s"
,
pTrans
->
id
,
mndTransStageStr
(
pTrans
->
stage
));
mTrace
(
"trans:%d, perform delete action, stage:%s"
,
pTrans
->
id
,
mndTransStageStr
(
pTrans
->
stage
));
mndTransDrop
Array
(
pTrans
->
redoLogs
);
mndTransDrop
Logs
(
pTrans
->
redoLogs
);
mndTransDrop
Array
(
pTrans
->
undoLogs
);
mndTransDrop
Logs
(
pTrans
->
undoLogs
);
mndTransDrop
Array
(
pTrans
->
commitLogs
);
mndTransDrop
Logs
(
pTrans
->
commitLogs
);
mndTransDropA
rray
(
pTrans
->
redoActions
);
mndTransDropA
ctions
(
pTrans
->
redoActions
);
mndTransDropA
rray
(
pTrans
->
undoActions
);
mndTransDropA
ctions
(
pTrans
->
undoActions
);
return
0
;
return
0
;
}
}
...
@@ -274,6 +345,8 @@ char *mndTransStageStr(ETrnStage stage) {
...
@@ -274,6 +345,8 @@ char *mndTransStageStr(ETrnStage stage) {
return
"rollback"
;
return
"rollback"
;
case
TRN_STAGE_RETRY
:
case
TRN_STAGE_RETRY
:
return
"retry"
;
return
"retry"
;
case
TRN_STAGE_OVER
:
return
"stop"
;
default:
default:
return
"undefined"
;
return
"undefined"
;
}
}
...
@@ -305,8 +378,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
...
@@ -305,8 +378,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
@@ -319,7 +392,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
...
@@ -319,7 +392,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return
pTrans
;
return
pTrans
;
}
}
static
void
mndTransDrop
Array
(
SArray
*
pArray
)
{
static
void
mndTransDrop
Logs
(
SArray
*
pArray
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
tfree
(
pRaw
);
tfree
(
pRaw
);
...
@@ -328,12 +401,21 @@ static void mndTransDropArray(SArray *pArray) {
...
@@ -328,12 +401,21 @@ static void mndTransDropArray(SArray *pArray) {
taosArrayDestroy
(
pArray
);
taosArrayDestroy
(
pArray
);
}
}
static
void
mndTransDropActions
(
SArray
*
pArray
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
i
);
free
(
pAction
->
pCont
);
}
taosArrayDestroy
(
pArray
);
}
void
mndTransDrop
(
STrans
*
pTrans
)
{
void
mndTransDrop
(
STrans
*
pTrans
)
{
mndTransDrop
Array
(
pTrans
->
redoLogs
);
mndTransDrop
Logs
(
pTrans
->
redoLogs
);
mndTransDrop
Array
(
pTrans
->
undoLogs
);
mndTransDrop
Logs
(
pTrans
->
undoLogs
);
mndTransDrop
Array
(
pTrans
->
commitLogs
);
mndTransDrop
Logs
(
pTrans
->
commitLogs
);
mndTransDropA
rray
(
pTrans
->
redoActions
);
mndTransDropA
ctions
(
pTrans
->
redoActions
);
mndTransDropA
rray
(
pTrans
->
undoActions
);
mndTransDropA
ctions
(
pTrans
->
undoActions
);
mDebug
(
"trans:%d, data:%p is dropped"
,
pTrans
->
id
,
pTrans
);
mDebug
(
"trans:%d, data:%p is dropped"
,
pTrans
->
id
,
pTrans
);
tfree
(
pTrans
);
tfree
(
pTrans
);
...
@@ -344,7 +426,7 @@ static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
...
@@ -344,7 +426,7 @@ static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
mTrace
(
"trans:%d, set rpc handle:%p"
,
pTrans
->
id
,
rpcHandle
);
mTrace
(
"trans:%d, set rpc handle:%p"
,
pTrans
->
id
,
rpcHandle
);
}
}
static
int32_t
mndTransAppend
Array
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
)
{
static
int32_t
mndTransAppend
Log
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
)
{
if
(
pArray
==
NULL
||
pRaw
==
NULL
)
{
if
(
pArray
==
NULL
||
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
...
@@ -360,32 +442,44 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
...
@@ -360,32 +442,44 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
}
}
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppend
Array
(
pTrans
->
redoLogs
,
pRaw
);
int32_t
code
=
mndTransAppend
Log
(
pTrans
->
redoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to redo logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to redo logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
return
code
;
}
}
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppend
Array
(
pTrans
->
undoLogs
,
pRaw
);
int32_t
code
=
mndTransAppend
Log
(
pTrans
->
undoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to undo logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to undo logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
return
code
;
}
}
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppend
Array
(
pTrans
->
commitLogs
,
pRaw
);
int32_t
code
=
mndTransAppend
Log
(
pTrans
->
commitLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to commit logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
mTrace
(
"trans:%d, raw:%p append to commit logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
return
code
;
}
}
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
void
*
pMsg
)
{
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
redoActions
,
pMsg
);
STransAction
action
=
{.
epSet
=
*
pEpSet
,
.
msgType
=
msgType
,
.
contLen
=
contLen
,
.
pCont
=
pCont
};
mTrace
(
"trans:%d, msg:%p append to redo actions"
,
pTrans
->
id
,
pMsg
);
void
*
ptr
=
taosArrayPush
(
pArray
,
&
action
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
)
{
int32_t
code
=
mndTransAppendAction
(
pTrans
->
redoActions
,
pEpSet
,
msgType
,
contLen
,
pCont
);
mTrace
(
"trans:%d, msg:%s len:%d append to redo actions"
,
pTrans
->
id
,
taosMsg
[
msgType
],
contLen
);
return
code
;
return
code
;
}
}
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
void
*
pMsg
)
{
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
)
{
int32_t
code
=
mndTransAppendA
rray
(
pTrans
->
undoActions
,
pMsg
);
int32_t
code
=
mndTransAppendA
ction
(
pTrans
->
undoActions
,
pEpSet
,
msgType
,
contLen
,
pCont
);
mTrace
(
"trans:%d, msg:%
p append to undo actions"
,
pTrans
->
id
,
pMsg
);
mTrace
(
"trans:%d, msg:%
s len:%d append to undo actions"
,
pTrans
->
id
,
taosMsg
[
msgType
],
contLen
);
return
code
;
return
code
;
}
}
...
@@ -502,7 +596,7 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code)
...
@@ -502,7 +596,7 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code)
// todo
// todo
}
}
static
int32_t
mndTransExecute
Array
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
static
int32_t
mndTransExecute
Logs
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
...
@@ -520,7 +614,7 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
...
@@ -520,7 +614,7 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
redoLogs
)
!=
0
)
{
if
(
taosArrayGetSize
(
pTrans
->
redoLogs
)
!=
0
)
{
code
=
mndTransExecute
Array
(
pMnode
,
pTrans
->
redoLogs
);
code
=
mndTransExecute
Logs
(
pMnode
,
pTrans
->
redoLogs
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
}
else
{
...
@@ -534,7 +628,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
...
@@ -534,7 +628,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
undoLogs
)
!=
0
)
{
if
(
taosArrayGetSize
(
pTrans
->
undoLogs
)
!=
0
)
{
code
=
mndTransExecute
Array
(
pMnode
,
pTrans
->
undoLogs
);
code
=
mndTransExecute
Logs
(
pMnode
,
pTrans
->
undoLogs
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
}
else
{
...
@@ -548,7 +642,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
...
@@ -548,7 +642,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
code
=
mndTransExecute
Array
(
pMnode
,
pTrans
->
commitLogs
);
code
=
mndTransExecute
Logs
(
pMnode
,
pTrans
->
commitLogs
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
}
else
{
...
@@ -559,18 +653,40 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
...
@@ -559,18 +653,40 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
return
code
;
return
code
;
}
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
if
(
taosArrayGetSize
(
pTrans
->
redoActions
)
!=
0
)
{
#if 0
mTrace
(
"trans:%d, execute redo actions finished"
,
pTrans
->
id
);
int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg);
}
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
#else
return
0
;
return
0
;
#endif
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
redoActions
)
<=
0
)
return
0
;
mTrace
(
"trans:%d, start to execute redo actions"
,
pTrans
->
id
);
return
mndTransExecuteActions
(
pMnode
,
pTrans
->
redoActions
);
}
}
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
undoActions
)
!=
0
)
{
if
(
taosArrayGetSize
(
pTrans
->
undoActions
)
<=
0
)
return
0
;
mTrace
(
"trans:%d, execute undo actions finished"
,
pTrans
->
id
);
}
mTrace
(
"trans:%d, start to execute undo actions"
,
pTrans
->
id
);
return
0
;
return
mndTransExecuteActions
(
pMnode
,
pTrans
->
undoActions
)
;
}
}
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
a46e3a9e
...
@@ -24,9 +24,10 @@
...
@@ -24,9 +24,10 @@
#define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_RESERVE_SIZE 64
#define TSDB_VGROUP_RESERVE_SIZE 64
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOldVgroup
,
SVgObj
*
pNewVgroup
);
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOldVgroup
,
SVgObj
*
pNewVgroup
);
static
int32_t
mndProcessCreateVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
);
...
@@ -156,6 +157,80 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
...
@@ -156,6 +157,80 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
}
}
SCreateVnodeMsg
*
mndBuildCreateVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SCreateVnodeMsg
*
pCreate
=
malloc
(
sizeof
(
SCreateVnodeMsg
));
if
(
pCreate
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pCreate
->
dnodeId
=
htonl
(
pDnode
->
id
);
pCreate
->
vgId
=
htonl
(
pVgroup
->
vgId
);
memcpy
(
pCreate
->
db
,
pDb
->
name
,
TSDB_FULL_DB_NAME_LEN
);
pCreate
->
dbUid
=
htobe64
(
pDb
->
uid
);
pCreate
->
cacheBlockSize
=
htonl
(
pDb
->
cfg
.
cacheBlockSize
);
pCreate
->
totalBlocks
=
htonl
(
pDb
->
cfg
.
totalBlocks
);
pCreate
->
daysPerFile
=
htonl
(
pDb
->
cfg
.
daysPerFile
);
pCreate
->
daysToKeep0
=
htonl
(
pDb
->
cfg
.
daysToKeep0
);
pCreate
->
daysToKeep1
=
htonl
(
pDb
->
cfg
.
daysToKeep1
);
pCreate
->
daysToKeep2
=
htonl
(
pDb
->
cfg
.
daysToKeep2
);
pCreate
->
minRows
=
htonl
(
pDb
->
cfg
.
minRows
);
pCreate
->
maxRows
=
htonl
(
pDb
->
cfg
.
maxRows
);
pCreate
->
commitTime
=
htonl
(
pDb
->
cfg
.
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pDb
->
cfg
.
fsyncPeriod
);
pCreate
->
walLevel
=
pDb
->
cfg
.
walLevel
;
pCreate
->
precision
=
pDb
->
cfg
.
precision
;
pCreate
->
compression
=
pDb
->
cfg
.
compression
;
pCreate
->
quorum
=
pDb
->
cfg
.
quorum
;
pCreate
->
update
=
pDb
->
cfg
.
update
;
pCreate
->
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
pCreate
->
replica
=
pVgroup
->
replica
;
pCreate
->
selfIndex
=
-
1
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
pCreate
->
replicas
[
v
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
v
];
SDnodeObj
*
pVgidDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pVgidDnode
==
NULL
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
NULL
;
}
pReplica
->
id
=
htonl
(
pVgidDnode
->
id
);
pReplica
->
port
=
htons
(
pVgidDnode
->
port
);
memcpy
(
pReplica
->
fqdn
,
pVgidDnode
->
fqdn
,
TSDB_FQDN_LEN
);
mndReleaseDnode
(
pMnode
,
pVgidDnode
);
if
(
pDnode
->
id
==
pVgid
->
dnodeId
)
{
pCreate
->
selfIndex
=
v
;
}
}
if
(
pCreate
->
selfIndex
==
-
1
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
NULL
;
}
return
pCreate
;
}
SDropVnodeMsg
*
mndBuildDropVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SDropVnodeMsg
*
pDrop
=
malloc
(
sizeof
(
SDropVnodeMsg
));
if
(
pDrop
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pDrop
->
dnodeId
=
htonl
(
pDnode
->
id
);
pDrop
->
vgId
=
htonl
(
pVgroup
->
vgId
);
memcpy
(
pDrop
->
db
,
pDb
->
name
,
TSDB_FULL_DB_NAME_LEN
);
pDrop
->
dbUid
=
htobe64
(
pDb
->
uid
);
return
pDrop
;
}
static
int32_t
mndGetAvailableDnode
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
)
{
static
int32_t
mndGetAvailableDnode
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
allocedVnodes
=
0
;
int32_t
allocedVnodes
=
0
;
...
...
source/util/src/terror.c
浏览文件 @
a46e3a9e
...
@@ -130,6 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
...
@@ -130,6 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSC_INVALID_INPUT
,
"Invalid tsc input"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSC_INVALID_INPUT
,
"Invalid tsc input"
)
// mnode-common
// mnode-common
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_APP_ERROR
,
"Mnode internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_NOT_READY
,
"Cluster not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_NOT_READY
,
"Cluster not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MSG_NOT_PROCESSED
,
"Message not processed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MSG_NOT_PROCESSED
,
"Message not processed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACTION_IN_PROGRESS
,
"Message is progressing"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACTION_IN_PROGRESS
,
"Message is progressing"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录