Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
43df4f1c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
43df4f1c
编写于
6月 05, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
6月 05, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13477 from taosdata/fix/mnode
refactor: add alter-confirm while alter db
上级
82c5308c
36fd0dac
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
284 addition
and
285 deletion
+284
-285
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-4
include/util/taoserror.h
include/util/taoserror.h
+3
-3
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+19
-9
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+1
-1
source/dnode/mnode/impl/src/mndBnode.c
source/dnode/mnode/impl/src/mndBnode.c
+11
-21
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+67
-29
source/dnode/mnode/impl/src/mndQnode.c
source/dnode/mnode/impl/src/mndQnode.c
+11
-21
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+11
-21
source/dnode/mnode/impl/src/mndSnode.c
source/dnode/mnode/impl/src/mndSnode.c
+11
-21
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+3
-21
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+11
-15
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+13
-17
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+10
-14
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+8
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+31
-53
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+48
-33
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+1
-1
tests/script/tsim/db/alter_replica_31.sim
tests/script/tsim/db/alter_replica_31.sim
+21
-0
未找到文件。
include/common/tmsgdef.h
浏览文件 @
43df4f1c
...
...
@@ -193,13 +193,11 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_SMA
,
"vnode-drop-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBMIT_RSMA
,
"vnode-submit-rsma"
,
SSubmitReq
,
SSubmitRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_GET_TSMA_EXP_WNDS
,
"vnode-get-tsma-expired-windows"
,
SVGetTsmaExpWndsReq
,
SVGetTsmaExpWndsRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DELETE
,
"delete-data"
,
SVDeleteReq
,
SVDeleteRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_CONFIG
,
"alter-config"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_REPLICA
,
"alter-replica"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_START_WRITE
,
"start-write"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_STOP_WRITE
,
"stop-write"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_CONFIRM_WRITE
,
"confirm-write"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_CONFIRM
,
"alter-confirm"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMPACT
,
"compact"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DELETE
,
"delete-data"
,
SVDeleteReq
,
SVDeleteRsp
)
TD_NEW_MSG_SEG
(
TDMT_QND_MSG
)
...
...
include/util/taoserror.h
浏览文件 @
43df4f1c
...
...
@@ -40,9 +40,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
//common & util
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x000
1
)
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000
2
)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x000
3
)
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x000
3
)
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000
4
)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x000
5
)
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0010)
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0011)
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0012)
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
43df4f1c
...
...
@@ -213,6 +213,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TASK_DEPLOY_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIRM_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_TIMEOUT
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
43df4f1c
...
...
@@ -361,6 +361,7 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIRM
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_VNODE
,
vmPutMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_VNODE
,
vmPutMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
43df4f1c
...
...
@@ -118,25 +118,36 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
m
);
code
=
vnodePreprocessReq
(
pVnode
->
pImpl
,
pMsg
);
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
continue
;
if
(
code
!=
0
)
{
dError
(
"vgId:%d, msg:%p failed to write since %s"
,
pVnode
->
vgId
,
pMsg
,
tstrerror
(
code
));
vmSendRsp
(
pMsg
,
code
);
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
dTrace
(
"vgId:%d, msg:%p in progress and no rsp"
,
pVnode
->
vgId
,
pMsg
);
continue
;
}
code
=
syncPropose
(
sync
,
pMsg
,
false
);
if
(
pMsg
->
msgType
!=
TDMT_VND_ALTER_REPLICA
)
{
code
=
syncPropose
(
sync
,
pMsg
,
false
);
}
if
(
code
==
TAOS_SYNC_PROPOSE_SUCCESS
)
{
dTrace
(
"vgId:%d, msg:%p is proposed and no rsp"
,
pVnode
->
vgId
,
pMsg
);
continue
;
}
else
if
(
code
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
dTrace
(
"vgId:%d, msg:%p is redirect since not leader"
,
pVnode
->
vgId
,
pMsg
);
SEpSet
newEpSet
=
{
0
};
syncGetEpSet
(
sync
,
&
newEpSet
);
newEpSet
.
inUse
=
(
newEpSet
.
inUse
+
1
)
%
newEpSet
.
numOfEps
;
SEp
*
pEp
=
&
newEpSet
.
eps
[
newEpSet
.
inUse
];
if
(
pEp
->
port
==
tsServerPort
&&
strcmp
(
pEp
->
fqdn
,
tsLocalFqdn
)
==
0
)
{
newEpSet
.
inUse
=
(
newEpSet
.
inUse
+
1
)
%
newEpSet
.
numOfEps
;
}
dTrace
(
"vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d"
,
pVnode
->
vgId
,
pMsg
,
newEpSet
.
numOfEps
,
newEpSet
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
newEpSet
.
numOfEps
;
++
i
)
{
dTrace
(
"vgId:%d, msg:%p ep:%s:%u"
,
pVnode
->
vgId
,
pMsg
,
newEpSet
.
eps
[
i
].
fqdn
,
newEpSet
.
eps
[
i
].
port
);
}
SRpcMsg
rsp
=
{.
code
=
TSDB_CODE_RPC_REDIRECT
,
.
info
=
pMsg
->
info
};
tmsgSendRedirectRsp
(
&
rsp
,
&
newEpSet
);
}
else
{
dError
(
"vgId:%d, msg:%p failed to
write since %s"
,
pVnode
->
vgId
,
pMsg
,
tstrerror
(
code
)
);
dError
(
"vgId:%d, msg:%p failed to
propose write since %s, code:0x%x"
,
pVnode
->
vgId
,
pMsg
,
tstrerror
(
code
),
code
);
vmSendRsp
(
pMsg
,
code
);
}
}
...
...
@@ -251,7 +262,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
switch
(
qtype
)
{
case
QUERY_QUEUE
:
vnodePreprocessQueryMsg
(
pVnode
->
pImpl
,
pMsg
);
dTrace
(
"vgId:%d, msg:%p put into vnode-query queue"
,
pVnode
->
vgId
,
pMsg
);
taosWriteQitem
(
pVnode
->
pQueryQ
,
pMsg
);
break
;
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
43df4f1c
...
...
@@ -34,7 +34,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
int32_t
mndAllocVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
SArray
*
mndBuildDnodesArray
(
SMnode
*
pMnode
);
int32_t
mndAddVnodeToVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
);
int32_t
mndRemoveVnodeFromVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
,
SVnodeGid
*
del1
,
SVnodeGid
*
del2
);
int32_t
mndRemoveVnodeFromVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
,
SVnodeGid
*
pVgId
);
void
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
,
bool
standby
);
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
...
...
source/dnode/mnode/impl/src/mndBnode.c
浏览文件 @
43df4f1c
...
...
@@ -30,25 +30,25 @@ static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj);
static
int32_t
mndBnodeActionUpdate
(
SSdb
*
pSdb
,
SBnodeObj
*
pOld
,
SBnodeObj
*
pNew
);
static
int32_t
mndBnodeActionDelete
(
SSdb
*
pSdb
,
SBnodeObj
*
pObj
);
static
int32_t
mndProcessCreateBnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessCreateBnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessDropBnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropBnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndRetrieveBnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextBnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitBnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_BNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndBnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndBnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndBnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndBnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndBnodeActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_BNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndBnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndBnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndBnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndBnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndBnodeActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_BNODE
,
mndProcessCreateBnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_BNODE
,
mndProcessDropBnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_BNODE_RSP
,
mnd
ProcessCreateBnode
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_BNODE_RSP
,
mnd
ProcessDropBnode
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_BNODE_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_BNODE_RSP
,
mnd
TransProcess
Rsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_BNODE
,
mndRetrieveBnodes
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_BNODE
,
mndCancelGetNextBnode
);
...
...
@@ -427,16 +427,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessCreateBnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndProcessDropBnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndRetrieveBnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
43df4f1c
...
...
@@ -263,7 +263,8 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease
(
pSdb
,
pDb
);
}
static
int32_t
mndAddCreateVnodeAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVgid
,
bool
standby
)
{
static
int32_t
mndAddCreateVnodeAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVgid
,
bool
standby
)
{
STransAction
action
=
{
0
};
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
...
...
@@ -288,6 +289,32 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
return
0
;
}
static
int32_t
mndAddAlterVnodeConfirmAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
int32_t
contLen
=
sizeof
(
SMsgHead
);
SMsgHead
*
pHead
=
taosMemoryMalloc
(
contLen
);
if
(
pHead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
action
.
pCont
=
pHead
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_VND_ALTER_CONFIRM
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pHead
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndAddAlterVnodeAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
tmsg_t
msgType
)
{
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
...
...
@@ -415,7 +442,6 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if
(
pCfg
->
cacheLastRow
<
0
)
pCfg
->
cacheLastRow
=
TSDB_DEFAULT_CACHE_LAST_ROW
;
if
(
pCfg
->
numOfRetensions
<
0
)
pCfg
->
numOfRetensions
=
0
;
if
(
pCfg
->
schemaless
<
0
)
pCfg
->
schemaless
=
TSDB_DB_SCHEMALESS_OFF
;
}
static
int32_t
mndSetCreateDbRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
...
...
@@ -726,30 +752,32 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
mndTransSetSerial
(
pTrans
);
if
(
newVgroup
.
replica
<
pDb
->
cfg
.
replications
)
{
mInfo
(
"db:%s, vgId:%d,
will add 2 vnodes, vn:0 dnode:%d
"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
mInfo
(
"db:%s, vgId:%d,
vn:0 dnode:%d, will add 2 vnodes
"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
pVgroup
->
vnodeGid
[
0
].
dnodeId
);
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVgroup
,
pArray
)
!=
0
)
{
mError
(
"db:%s, failed to add vnode to vgId:%d since %s"
,
pDb
->
name
,
newVgroup
.
vgId
,
terrstr
());
return
-
1
;
}
newVgroup
.
replica
=
pDb
->
cfg
.
replications
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVgroup
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
1
],
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
TDMT_VND_ALTER_REPLICA
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
&
newVgroup
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
2
],
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
TDMT_VND_ALTER_REPLICA
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
}
else
{
mInfo
(
"db:%s, vgId:%d, will remove 2 vnodes"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
);
SVnodeGid
del1
=
{
0
};
SVnodeGid
del2
=
{
0
};
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
&
newVgroup
,
pArray
,
&
del1
,
&
del2
)
!=
0
)
{
mError
(
"db:%s, failed to remove vnode from vgId:%d since %s"
,
pDb
->
name
,
newVgroup
.
vgId
,
terrstr
());
return
-
1
;
}
newVgroup
.
replica
=
pDb
->
cfg
.
replications
;
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
&
newVgroup
,
pArray
,
&
del1
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
TDMT_VND_ALTER_REPLICA
)
!=
0
)
return
-
1
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
del1
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
SVnodeGid
del2
=
{
0
};
if
(
mndRemoveVnodeFromVgroup
(
pMnode
,
&
newVgroup
,
pArray
,
&
del2
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
TDMT_VND_ALTER_REPLICA
)
!=
0
)
return
-
1
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
del2
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
}
SSdbRaw
*
pVgRaw
=
mndVgroupActionEncode
(
&
newVgroup
);
...
...
@@ -1341,7 +1369,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t
numOfTable
=
mndGetDBTableNum
(
pDb
,
pMnode
);
if
(
pDbVgVersion
->
vgVersion
>=
pDb
->
vgVersion
&&
numOfTable
==
pDbVgVersion
->
numOfTable
)
{
mDebug
(
"db:%s, version
&
numOfTable not changed"
,
pDbVgVersion
->
dbFName
);
mDebug
(
"db:%s, version
and
numOfTable not changed"
,
pDbVgVersion
->
dbFName
);
mndReleaseDb
(
pMnode
,
pDb
);
continue
;
}
else
{
...
...
@@ -1433,12 +1461,22 @@ const char *mndGetDbStr(const char *src) {
int64_t
getValOfDiffPrecision
(
int8_t
unit
,
int64_t
val
)
{
int64_t
v
=
0
;
switch
(
unit
)
{
case
's'
:
v
=
val
/
1000
;
break
;
case
'm'
:
v
=
val
/
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
];
break
;
case
'h'
:
v
=
val
/
(
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
]
*
60
);
break
;
case
'd'
:
v
=
val
/
(
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
]
*
24
*
60
);
break
;
case
'w'
:
v
=
val
/
(
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
]
*
24
*
60
*
7
);
break
;
switch
(
unit
)
{
case
's'
:
v
=
val
/
1000
;
break
;
case
'm'
:
v
=
val
/
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
];
break
;
case
'h'
:
v
=
val
/
(
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
]
*
60
);
break
;
case
'd'
:
v
=
val
/
(
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
]
*
24
*
60
);
break
;
case
'w'
:
v
=
val
/
(
tsTickPerMin
[
TSDB_TIME_PRECISION_MILLI
]
*
24
*
60
*
7
);
break
;
default:
break
;
}
...
...
@@ -1446,32 +1484,32 @@ int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
return
v
;
}
char
*
buildRetension
(
SArray
*
pRetension
)
{
char
*
buildRetension
(
SArray
*
pRetension
)
{
size_t
size
=
taosArrayGetSize
(
pRetension
);
if
(
size
==
0
)
{
return
NULL
;
}
char
*
p1
=
taosMemoryCalloc
(
1
,
100
);
SRetention
*
p
=
taosArrayGet
(
pRetension
,
0
);
char
*
p1
=
taosMemoryCalloc
(
1
,
100
);
SRetention
*
p
=
taosArrayGet
(
pRetension
,
0
);
int32_t
len
=
2
;
int64_t
v1
=
getValOfDiffPrecision
(
p
->
freqUnit
,
p
->
freq
);
int64_t
v2
=
getValOfDiffPrecision
(
p
->
keepUnit
,
p
->
keep
);
len
+=
sprintf
(
p1
+
len
,
"%"
PRId64
"%c:%"
PRId64
"%c,"
,
v1
,
p
->
freqUnit
,
v2
,
p
->
keepUnit
);
len
+=
sprintf
(
p1
+
len
,
"%"
PRId64
"%c:%"
PRId64
"%c,"
,
v1
,
p
->
freqUnit
,
v2
,
p
->
keepUnit
);
p
=
taosArrayGet
(
pRetension
,
1
);
v1
=
getValOfDiffPrecision
(
p
->
freqUnit
,
p
->
freq
);
v2
=
getValOfDiffPrecision
(
p
->
keepUnit
,
p
->
keep
);
len
+=
sprintf
(
p1
+
len
,
"%"
PRId64
"%c:%"
PRId64
"%c,"
,
v1
,
p
->
freqUnit
,
v2
,
p
->
keepUnit
);
len
+=
sprintf
(
p1
+
len
,
"%"
PRId64
"%c:%"
PRId64
"%c,"
,
v1
,
p
->
freqUnit
,
v2
,
p
->
keepUnit
);
p
=
taosArrayGet
(
pRetension
,
2
);
v1
=
getValOfDiffPrecision
(
p
->
freqUnit
,
p
->
freq
);
v2
=
getValOfDiffPrecision
(
p
->
keepUnit
,
p
->
keep
);
len
+=
sprintf
(
p1
+
len
,
"%"
PRId64
"%c:%"
PRId64
"%c"
,
v1
,
p
->
freqUnit
,
v2
,
p
->
keepUnit
);
len
+=
sprintf
(
p1
+
len
,
"%"
PRId64
"%c:%"
PRId64
"%c"
,
v1
,
p
->
freqUnit
,
v2
,
p
->
keepUnit
);
varDataSetLen
(
p1
,
len
);
return
p1
;
...
...
@@ -1586,7 +1624,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
}
colDataAppend(pColInfo, rows, cacheModel, null);
#endif
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
&
pDb
->
cfg
.
cacheLastRow
,
false
);
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
&
pDb
->
cfg
.
cacheLastRow
,
false
);
char
*
prec
=
NULL
;
switch
(
pDb
->
cfg
.
precision
)
{
...
...
@@ -1618,7 +1656,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
&
pDb
->
cfg
.
schemaless
,
false
);
char
*
p
=
buildRetension
(
pDb
->
cfg
.
pRetensions
);
char
*
p
=
buildRetension
(
pDb
->
cfg
.
pRetensions
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
);
if
(
p
==
NULL
)
{
...
...
source/dnode/mnode/impl/src/mndQnode.c
浏览文件 @
43df4f1c
...
...
@@ -30,26 +30,26 @@ static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj);
static
int32_t
mndQnodeActionUpdate
(
SSdb
*
pSdb
,
SQnodeObj
*
pOld
,
SQnodeObj
*
pNew
);
static
int32_t
mndQnodeActionDelete
(
SSdb
*
pSdb
,
SQnodeObj
*
pObj
);
static
int32_t
mndProcessCreateQnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessCreateQnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessDropQnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropQnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessQnodeListReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveQnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextQnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitQnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_QNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndQnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndQnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndQnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndQnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndQnodeActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_QNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndQnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndQnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndQnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndQnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndQnodeActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_QNODE
,
mndProcessCreateQnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_QNODE
,
mndProcessDropQnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_QNODE_RSP
,
mnd
ProcessCreateQnode
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_QNODE_RSP
,
mnd
ProcessDropQnode
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_QNODE_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_QNODE_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_QNODE_LIST
,
mndProcessQnodeListReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_QNODE
,
mndRetrieveQnodes
);
...
...
@@ -503,16 +503,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessCreateQnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndProcessDropQnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndRetrieveQnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
43df4f1c
...
...
@@ -38,25 +38,25 @@ static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
static
int32_t
mndSmaActionUpdate
(
SSdb
*
pSdb
,
SSmaObj
*
pOld
,
SSmaObj
*
pNew
);
static
int32_t
mndProcessMCreateSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessMDropSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessVCreateSmaRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessVDropSmaRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessGetSmaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveSma
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitSma
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SMA
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSmaActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSmaActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSmaActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSmaActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSmaActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_SMA
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSmaActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSmaActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSmaActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSmaActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSmaActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_SMA
,
mndProcessMCreateSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_SMA
,
mndProcessMDropSmaReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_SMA_RSP
,
mnd
ProcessVCreateSma
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_SMA_RSP
,
mnd
ProcessVDropSma
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_SMA_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_SMA_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_INDEX
,
mndProcessGetSmaReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_INDEX
,
mndRetrieveSma
);
...
...
@@ -637,11 +637,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessVCreateSmaRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndSetDropSmaRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SSmaObj
*
pSma
)
{
SSdbRaw
*
pRedoRaw
=
mndSmaActionEncode
(
pSma
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
...
...
@@ -910,11 +905,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessVDropSmaRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndRetrieveSma
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/dnode/mnode/impl/src/mndSnode.c
浏览文件 @
43df4f1c
...
...
@@ -30,25 +30,25 @@ static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
static
int32_t
mndSnodeActionUpdate
(
SSdb
*
pSdb
,
SSnodeObj
*
pOld
,
SSnodeObj
*
pNew
);
static
int32_t
mndSnodeActionDelete
(
SSdb
*
pSdb
,
SSnodeObj
*
pObj
);
static
int32_t
mndProcessCreateSnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessCreateSnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessDropSnodeReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropSnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndRetrieveSnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitSnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSnodeActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_SNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSnodeActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_SNODE
,
mndProcessCreateSnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_SNODE
,
mndProcessDropSnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_SNODE_RSP
,
mnd
ProcessCreateSnode
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_SNODE_RSP
,
mnd
ProcessDropSnode
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_SNODE_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_SNODE_RSP
,
mnd
TransProcess
Rsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_SNODE
,
mndRetrieveSnodes
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_SNODE
,
mndCancelGetNextSnode
);
...
...
@@ -437,16 +437,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessCreateSnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndProcessDropSnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndRetrieveSnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
43df4f1c
...
...
@@ -38,9 +38,6 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static
int32_t
mndProcessMCreateStbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessMAlterStbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessMDropStbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessVCreateStbRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessVAlterStbRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessVDropStbRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessTableMetaReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveStb
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextStb
(
SMnode
*
pMnode
,
void
*
pIter
);
...
...
@@ -59,9 +56,9 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_STB
,
mndProcessMCreateStbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_ALTER_STB
,
mndProcessMAlterStbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_STB
,
mndProcessMDropStbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_STB_RSP
,
mnd
ProcessVCreateStb
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_STB_RSP
,
mnd
ProcessVAlterStb
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_STB_RSP
,
mnd
ProcessVDropStb
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_CREATE_STB_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_STB_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_STB_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_TABLE_META
,
mndProcessTableMetaReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_STB
,
mndRetrieveStb
);
...
...
@@ -837,11 +834,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessVCreateStbRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndCheckAlterStbReq
(
SMAlterStbReq
*
pAlter
)
{
if
(
pAlter
->
commentLen
!=
0
||
pAlter
->
ttl
!=
0
)
return
0
;
...
...
@@ -1459,11 +1451,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessVAlterStbRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndSetDropStbRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStbObj
*
pStb
)
{
SSdbRaw
*
pRedoRaw
=
mndStbActionEncode
(
pStb
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
...
...
@@ -1599,11 +1586,6 @@ _OVER:
return
code
;
}
static
int32_t
mndProcessVDropStbRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndProcessTableMetaReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
43df4f1c
...
...
@@ -35,7 +35,6 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static
int32_t
mndStreamActionDelete
(
SSdb
*
pSdb
,
SStreamObj
*
pStream
);
static
int32_t
mndStreamActionUpdate
(
SSdb
*
pSdb
,
SStreamObj
*
pStream
,
SStreamObj
*
pNewStream
);
static
int32_t
mndProcessCreateStreamReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessTaskDeployInternalRsp
(
SRpcMsg
*
pRsp
);
/*static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);*/
/*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/
static
int32_t
mndProcessStreamMetaReq
(
SRpcMsg
*
pReq
);
...
...
@@ -44,17 +43,19 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static
void
mndCancelGetNextStream
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitStream
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_STREAM
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndStreamActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndStreamActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndStreamActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndStreamActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndStreamActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_STREAM
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndStreamActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndStreamActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndStreamActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndStreamActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndStreamActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_STREAM
,
mndProcessCreateStreamReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_TASK_DEPLOY_RSP
,
mnd
ProcessTaskDeployInternal
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_SND_TASK_DEPLOY_RSP
,
mnd
ProcessTaskDeployInternal
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_TASK_DEPLOY_RSP
,
mnd
TransProcess
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_SND_TASK_DEPLOY_RSP
,
mnd
TransProcess
Rsp
);
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
...
...
@@ -195,11 +196,6 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease
(
pSdb
,
pStream
);
}
static
int32_t
mndProcessTaskDeployInternalRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
SDbObj
*
mndAcquireDbByStream
(
SMnode
*
pMnode
,
char
*
streamName
)
{
SName
name
=
{
0
};
tNameFromString
(
&
name
,
streamName
,
T_NAME_ACCT
|
T_NAME_DB
|
T_NAME_TABLE
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
43df4f1c
...
...
@@ -43,7 +43,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs
static
int32_t
mndProcessRebalanceReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndProcessDropCgroupReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndProcessSubscribeInternalRsp
(
SRpcMsg
*
pMsg
);
static
int32_t
mndRetrieveSubscribe
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSubscribe
(
SMnode
*
pMnode
,
void
*
pIter
);
...
...
@@ -65,20 +64,22 @@ static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeO
}
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSubActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSubActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSubActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSubActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSubActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_VG_CHANGE_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_VG_DELETE_RSP
,
mndProcessSubscribeInternalRsp
);
SSdbTable
table
=
{
.
sdbType
=
SDB_SUBSCRIBE
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSubActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSubActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSubActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSubActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSubActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_VG_CHANGE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_VG_DELETE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DO_REBALANCE
,
mndProcessRebalanceReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DO_REBALANCE
,
mndProcessRebalanceReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DROP_CGROUP
,
mndProcessDropCgroupReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DROP_CGROUP_RSP
,
mnd
ProcessSubscribeInternal
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DROP_CGROUP_RSP
,
mnd
TransProcess
Rsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_SUBSCRIPTIONS
,
mndRetrieveSubscribe
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndCancelGetNextSubscribe
);
...
...
@@ -789,11 +790,6 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
sdbRelease
(
pSdb
,
pSub
);
}
static
int32_t
mndProcessSubscribeInternalRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndSetDropSubRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqSubscribeObj
*
pSub
)
{
SSdbRaw
*
pRedoRaw
=
mndSubActionEncode
(
pSub
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
43df4f1c
...
...
@@ -37,7 +37,6 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
,
SMqTopicObj
*
pNewTopic
);
static
int32_t
mndProcessCreateTopicReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropTopicReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropTopicInRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndRetrieveTopic
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextTopic
(
SMnode
*
pMnode
,
void
*
pIter
);
...
...
@@ -45,17 +44,19 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
static
int32_t
mndSetDropTopicCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqTopicObj
*
pTopic
);
int32_t
mndInitTopic
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_TOPIC
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndTopicActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndTopicActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndTopicActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndTopicActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndTopicActionDelete
};
SSdbTable
table
=
{
.
sdbType
=
SDB_TOPIC
,
.
keyType
=
SDB_KEY_BINARY
,
.
encodeFp
=
(
SdbEncodeFp
)
mndTopicActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndTopicActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndTopicActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndTopicActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndTopicActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_TOPIC
,
mndProcessCreateTopicReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_TOPIC
,
mndProcessDropTopicReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_TOPIC_RSP
,
mnd
ProcessDropTopicIn
Rsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_DROP_TOPIC_RSP
,
mnd
TransProcess
Rsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndRetrieveTopic
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_TOPICS
,
mndCancelGetNextTopic
);
...
...
@@ -607,11 +608,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessDropTopicInRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndGetNumOfTopics
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfTopics
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
dbName
);
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
43df4f1c
...
...
@@ -870,8 +870,15 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction
->
rawWritten
=
0
;
pAction
->
msgSent
=
0
;
pAction
->
msgReceived
=
0
;
if
(
pAction
->
errCode
==
TSDB_CODE_RPC_REDIRECT
)
{
pAction
->
epSet
.
inUse
=
(
pAction
->
epSet
.
inUse
+
1
)
%
pAction
->
epSet
.
numOfEps
;
mDebug
(
"trans:%d, %s:%d execute status is reset and set epset inuse:%d"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
action
,
pAction
->
epSet
.
inUse
);
}
else
{
mDebug
(
"trans:%d, %s:%d execute status is reset"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
action
);
}
pAction
->
errCode
=
0
;
mDebug
(
"trans:%d, %s:%d execute status is reset"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
action
);
}
}
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
43df4f1c
...
...
@@ -29,11 +29,6 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOld
,
SVgObj
*
pNew
);
static
int32_t
mndProcessCreateVnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessAlterVnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessDropVnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndProcessCompactVnodeRsp
(
SRpcMsg
*
pRsp
);
static
int32_t
mndRetrieveVgroups
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextVgroup
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndRetrieveVnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
...
...
@@ -50,11 +45,12 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.
deleteFp
=
(
SdbDeleteFp
)
mndVgroupActionDelete
,
};
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_VNODE_RSP
,
mndProcessCreateVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_REPLICA_RSP
,
mndProcessAlterVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_CONFIG_RSP
,
mndProcessAlterVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_VNODE_RSP
,
mndProcessDropVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_COMPACT_RSP
,
mndProcessCompactVnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_VNODE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_REPLICA_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_CONFIG_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_ALTER_CONFIRM_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_VNODE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_COMPACT_RSP
,
mndTransProcessRsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_VGROUP
,
mndRetrieveVgroups
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_VGROUP
,
mndCancelGetNextVgroup
);
...
...
@@ -512,12 +508,12 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
mDebug
(
"dnode:%d, equivalent vnodes:%d"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
);
}
int32_t
maxPos
=
1
;
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
]
;
for
(
int32_t
d
=
0
;
d
<
taosArrayGetSize
(
pArray
);
++
d
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
d
);
bool
used
=
false
;
for
(
int32_t
vn
=
0
;
vn
<
maxPos
;
++
vn
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
if
(
pDnode
->
id
==
pVgroup
->
vnodeGid
[
vn
].
dnodeId
)
{
used
=
true
;
break
;
...
...
@@ -530,59 +526,58 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
return
-
1
;
}
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
maxPos
];
pVgid
->
dnodeId
=
pDnode
->
id
;
pVgid
->
role
=
TAOS_SYNC_STATE_ERROR
;
pDnode
->
numOfVnodes
++
;
mInfo
(
"db:%s, vgId:%d, vn:%d dnode:%d, is added"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
pVgroup
->
replica
,
pVgid
->
dnodeId
);
mInfo
(
"db:%s, vgId:%d, vnode_index:%d dnode:%d is added"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
maxPos
,
pVgid
->
dnodeId
)
;
maxPo
s
++
;
if
(
maxPos
==
3
)
return
0
;
pVgroup
->
replica
++
;
pDnode
->
numOfVnode
s
++
;
return
0
;
}
terrno
=
TSDB_CODE_MND_NO_ENOUGH_DNODES
;
mError
(
"db:%s, failed to add vnode to vgId:%d since %s"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
terrstr
());
return
-
1
;
}
int32_t
mndRemoveVnodeFromVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
,
SVnodeGid
*
del1
,
SVnodeGid
*
del2
)
{
int32_t
mndRemoveVnodeFromVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
,
SVnodeGid
*
pDelVgid
)
{
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareDnodeVnodes
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
i
);
mDebug
(
"dnode:%d, equivalent vnodes:%d"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
);
}
int32_t
removedNum
=
0
;
int32_t
code
=
-
1
;
for
(
int32_t
d
=
taosArrayGetSize
(
pArray
)
-
1
;
d
>=
0
;
--
d
)
{
SDnodeObj
*
pDnode
=
taosArrayGet
(
pArray
,
d
);
for
(
int32_t
vn
=
0
;
vn
<
TSDB_MAX_REPLICA
;
++
vn
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
vn
];
if
(
pVgid
->
dnodeId
==
pDnode
->
id
)
{
if
(
removedNum
==
0
)
*
del1
=
*
pVgid
;
if
(
removedNum
==
1
)
*
del2
=
*
pVgid
;
mInfo
(
"db:%s, vgId:%d, vn:%d dnode:%d is removed"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
vn
,
pVgid
->
dnodeId
);
memset
(
pVgid
,
0
,
sizeof
(
SVnodeGid
));
removedNum
++
;
mInfo
(
"db:%s, vgId:%d, vn:%d dnode:%d, is removed"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
vn
,
pVgid
->
dnodeId
);
pDnode
->
numOfVnodes
--
;
if
(
removedNum
==
2
)
goto
_OVER
;
pVgroup
->
replica
--
;
*
pDelVgid
=
*
pVgid
;
*
pVgid
=
pVgroup
->
vnodeGid
[
pVgroup
->
replica
];
memset
(
&
pVgroup
->
vnodeGid
[
pVgroup
->
replica
],
0
,
sizeof
(
SVnodeGid
));
code
=
0
;
goto
_OVER
;
}
}
}
_OVER:
if
(
removedNum
!=
2
)
return
-
1
;
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_APP_ERROR
;
mError
(
"db:%s, failed to remove vnode from vgId:%d since %s"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
terrstr
());
return
-
1
;
}
for
(
int32_t
vn
=
1
;
vn
<
TSDB_MAX_REPLICA
;
++
vn
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
vn
];
if
(
pVgid
->
dnodeId
!=
0
)
{
memcpy
(
&
pVgroup
->
vnodeGid
[
0
],
pVgid
,
sizeof
(
SVnodeGid
));
memset
(
pVgid
,
0
,
sizeof
(
SVnodeGid
));
}
mInfo
(
"db:%s, vgId:%d, vn:%d dnode:%d is reserved"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
vn
,
pVgid
->
dnodeId
);
}
mInfo
(
"db:%s, vgId:%d, dnode:%d is keeped"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
,
pVgroup
->
vnodeGid
[
0
].
dnodeId
);
return
0
;
}
...
...
@@ -605,23 +600,6 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
return
epset
;
}
static
int32_t
mndProcessCreateVnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndProcessAlterVnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndProcessDropVnodeRsp
(
SRpcMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
int32_t
mndProcessCompactVnodeRsp
(
SRpcMsg
*
pRsp
)
{
return
0
;
}
static
bool
mndGetVgroupMaxReplicaFp
(
SMnode
*
pMnode
,
void
*
pObj
,
void
*
p1
,
void
*
p2
,
void
*
p3
)
{
SVgObj
*
pVgroup
=
pObj
;
int64_t
uid
=
*
(
int64_t
*
)
p1
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
43df4f1c
...
...
@@ -15,14 +15,15 @@
#include "vnd.h"
static
int
vnodeProcessCreateStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessAlterStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessDropStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessCreateTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessAlterTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessDropTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessSubmitReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessCreateTSmaReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessCreateStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessAlterStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDropStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessCreateTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessAlterTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDropTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessSubmitReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessCreateTSmaReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessAlterConfirmReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
int32_t
vnodePreprocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
...
...
@@ -99,11 +100,11 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
return
code
;
}
int
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
)
{
void
*
ptr
=
NULL
;
void
*
pReq
;
int
len
;
int
ret
;
int
32_t
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
)
{
void
*
ptr
=
NULL
;
void
*
pReq
;
int
32_t
len
;
int
32_t
ret
;
vTrace
(
"vgId:%d, start to process write request %s, version %"
PRId64
,
TD_VID
(
pVnode
),
TMSG_INFO
(
pMsg
->
msgType
),
version
);
...
...
@@ -158,6 +159,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
case
TDMT_VND_ALTER_CONFIRM
:
vnodeProcessAlterConfirmReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
);
break
;
case
TDMT_VND_ALTER_CONFIG
:
break
;
default:
...
...
@@ -190,7 +194,7 @@ _err:
return
-
1
;
}
int32_t
vnodePreprocessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
vnodePreprocessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
if
(
TDMT_VND_QUERY
!=
pMsg
->
msgType
)
{
return
0
;
}
...
...
@@ -212,7 +216,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
}
int
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
int
32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
vTrace
(
"message in fetch queue is processing"
);
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
...
...
@@ -267,7 +271,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp
->
precision
=
pVnode
->
config
.
tsdbCfg
.
precision
;
}
int
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int
32_t
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
int32_t
ret
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
if
(
syncEnvIsStart
())
{
...
...
@@ -357,7 +361,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return
ret
;
}
static
int
vnodeProcessCreateStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
in
t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessCreateStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_
t
len
,
SRpcMsg
*
pRsp
)
{
SVCreateStbReq
req
=
{
0
};
SDecoder
coder
;
...
...
@@ -389,9 +393,9 @@ _err:
return
-
1
;
}
static
int
vnodeProcessCreateTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
in
t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessCreateTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_
t
len
,
SRpcMsg
*
pRsp
)
{
SDecoder
decoder
=
{
0
};
int
rcode
=
0
;
int
32_t
rcode
=
0
;
SVCreateTbBatchReq
req
=
{
0
};
SVCreateTbReq
*
pCreateReq
;
SVCreateTbBatchRsp
rsp
=
{
0
};
...
...
@@ -422,7 +426,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
}
// loop to create table
for
(
int
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
for
(
int
32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
// validate hash
...
...
@@ -477,7 +481,7 @@ _exit:
return
rcode
;
}
static
int
vnodeProcessAlterStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessAlterStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SVCreateStbReq
req
=
{
0
};
SDecoder
dc
=
{
0
};
...
...
@@ -506,9 +510,9 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq,
return
0
;
}
static
int
vnodeProcessDropStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessDropStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SVDropStbReq
req
=
{
0
};
int
rcode
=
TSDB_CODE_SUCCESS
;
int
32_t
rcode
=
TSDB_CODE_SUCCESS
;
SDecoder
decoder
=
{
0
};
pRsp
->
msgType
=
TDMT_VND_CREATE_STB_RSP
;
...
...
@@ -535,12 +539,12 @@ _exit:
return
0
;
}
static
int
vnodeProcessAlterTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessAlterTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SVAlterTbReq
vAlterTbReq
=
{
0
};
SVAlterTbRsp
vAlterTbRsp
=
{
0
};
SDecoder
dc
=
{
0
};
int
rcode
=
0
;
int
ret
;
int
32_t
rcode
=
0
;
int
32_t
ret
;
SEncoder
ec
=
{
0
};
STableMetaRsp
vMetaRsp
=
{
0
};
...
...
@@ -582,12 +586,12 @@ _exit:
return
0
;
}
static
int
vnodeProcessDropTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessDropTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SVDropTbBatchReq
req
=
{
0
};
SVDropTbBatchRsp
rsp
=
{
0
};
SDecoder
decoder
=
{
0
};
SEncoder
encoder
=
{
0
};
int
ret
;
int
32_t
ret
;
SArray
*
tbUids
=
NULL
;
pRsp
->
msgType
=
TDMT_VND_DROP_TABLE_RSP
;
...
...
@@ -609,7 +613,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
rsp
.
pArray
=
taosArrayInit
(
req
.
nReqs
,
sizeof
(
SVDropTbRsp
));
if
(
tbUids
==
NULL
||
rsp
.
pArray
==
NULL
)
goto
_exit
;
for
(
int
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
for
(
int
32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
SVDropTbReq
*
pDropTbReq
=
req
.
pReqs
+
iReq
;
SVDropTbRsp
dropTbRsp
=
{
0
};
...
...
@@ -641,7 +645,8 @@ _exit:
return
0
;
}
static
int
vnodeDebugPrintSingleSubmitMsg
(
SMeta
*
pMeta
,
SSubmitBlk
*
pBlock
,
SSubmitMsgIter
*
msgIter
,
const
char
*
tags
)
{
static
int32_t
vnodeDebugPrintSingleSubmitMsg
(
SMeta
*
pMeta
,
SSubmitBlk
*
pBlock
,
SSubmitMsgIter
*
msgIter
,
const
char
*
tags
)
{
SSubmitBlkIter
blkIter
=
{
0
};
STSchema
*
pSchema
=
NULL
;
tb_uid_t
suid
=
0
;
...
...
@@ -675,7 +680,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
return
TSDB_CODE_SUCCESS
;
}
static
int
vnodeDebugPrintSubmitMsg
(
SVnode
*
pVnode
,
SSubmitReq
*
pMsg
,
const
char
*
tags
)
{
static
int
32_t
vnodeDebugPrintSubmitMsg
(
SVnode
*
pVnode
,
SSubmitReq
*
pMsg
,
const
char
*
tags
)
{
ASSERT
(
pMsg
!=
NULL
);
SSubmitMsgIter
msgIter
=
{
0
};
SMeta
*
pMeta
=
pVnode
->
pMeta
;
...
...
@@ -692,7 +697,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
return
0
;
}
static
int
vnodeProcessSubmitReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessSubmitReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SSubmitReq
*
pSubmitReq
=
(
SSubmitReq
*
)
pReq
;
SSubmitRsp
submitRsp
=
{
0
};
SSubmitMsgIter
msgIter
=
{
0
};
...
...
@@ -808,7 +813,7 @@ _exit:
return
0
;
}
static
int
vnodeProcessCreateTSmaReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
static
int
32_t
vnodeProcessCreateTSmaReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SVCreateTSmaReq
req
=
{
0
};
SDecoder
coder
;
...
...
@@ -859,3 +864,13 @@ _err:
int32_t
vnodeProcessCreateTSma
(
SVnode
*
pVnode
,
void
*
pCont
,
uint32_t
contLen
)
{
return
vnodeProcessCreateTSmaReq
(
pVnode
,
1
,
pCont
,
contLen
,
NULL
);
}
static
int32_t
vnodeProcessAlterConfirmReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
vInfo
(
"vgId:%d, alter replica confim msg is processed"
,
TD_VID
(
pVnode
));
pRsp
->
msgType
=
TDMT_VND_ALTER_CONFIRM_RSP
;
pRsp
->
code
=
TSDB_CODE_SUCCESS
;
pRsp
->
pCont
=
NULL
;
pRsp
->
contLen
=
0
;
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
43df4f1c
...
...
@@ -69,7 +69,7 @@ int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t
code
=
syncReconfig
(
pVnode
->
sync
,
&
cfg
);
if
(
code
==
TAOS_SYNC_PROPOSE_SUCCESS
)
{
// todo refactor
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
,
.
code
=
terrno
};
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
,
.
code
=
0
};
tmsgSendRsp
(
&
rsp
);
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
...
...
tests/script/tsim/db/alter_replica_31.sim
浏览文件 @
43df4f1c
...
...
@@ -78,6 +78,15 @@ if $data(4)[2] != 1 then
endi
# v1_dnode
$hasleader = 0
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 20 then
print ====> dnode not ready!
return -1
endi
sql show db.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08
if $data(2)[3] != 4 then
...
...
@@ -89,6 +98,18 @@ endi
if $data(2)[7] != 2 then
return -1
endi
if $data(2)[4] == leader then
$hasleader = 1
endi
if $data(2)[6] == leader then
$hasleader = 1
endi
if $data(2)[8] == leader then
$hasleader = 1
endi
if $hasleader != 1 then
goto step2
endi
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(101, "102")
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录