提交 8420c3aa 编写于 作者: S Shengliang Guan

remove mgmtq

上级 6160aa6e
......@@ -98,8 +98,6 @@ typedef struct {
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
taos_queue pMgmtQ;
SWorkerPool mgmtPool;
SWorkerPool queryPool;
SWorkerPool fetchPool;
SMWorkerPool syncPool;
......
......@@ -24,12 +24,18 @@ extern "C" {
int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads);
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -661,6 +661,24 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
case TDMT_MND_GRANT_RSP:
dndProcessGrantRsp(pDnode, pMsg);
break;
case TDMT_DND_CREATE_VNODE:
code = dndProcessCreateVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = dndProcessAlterVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = dndProcessDropVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_AUTH_VNODE:
code = dndProcessAuthVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = dndProcessSyncVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = dndProcessCompactVnodeReq(pDnode, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
......
......@@ -94,17 +94,17 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg;
// message from mnode to dnode
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg;
......
......@@ -56,11 +56,9 @@ typedef struct {
static int32_t dndInitVnodeReadWorker(SDnode *pDnode);
static int32_t dndInitVnodeWriteWorker(SDnode *pDnode);
static int32_t dndInitVnodeSyncWorker(SDnode *pDnode);
static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode);
static void dndCleanupVnodeReadWorker(SDnode *pDnode);
static void dndCleanupVnodeWriteWorker(SDnode *pDnode);
static void dndCleanupVnodeSyncWorker(SDnode *pDnode);
static void dndCleanupVnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode);
......@@ -77,12 +75,10 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg);
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs);
static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
......@@ -96,13 +92,6 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode);
static int32_t dndOpenVnodes(SDnode *pDnode);
static void dndCloseVnodes(SDnode *pDnode);
static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj *pVnode = NULL;
......@@ -600,7 +589,7 @@ static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) {
return pAuth;
}
static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
......@@ -641,7 +630,7 @@ static int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0;
}
static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg);
dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
......@@ -680,7 +669,7 @@ static int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return code;
}
static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg);
int32_t vgId = pDrop->vgId;
......@@ -707,7 +696,7 @@ static int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0;
}
static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t code = 0;
......@@ -725,7 +714,7 @@ static int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0;
}
static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t vgId = pAuth->vgId;
......@@ -747,7 +736,7 @@ static int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0;
}
static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t vgId = pCompact->vgId;
......@@ -769,39 +758,6 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return 0;
}
static void dndProcessVnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) {
case TDMT_DND_CREATE_VNODE:
code = dndProcessCreateVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = dndProcessAlterVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = dndProcessDropVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_AUTH_VNODE:
code = dndProcessAuthVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = dndProcessSyncVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = dndProcessCompactVnodeReq(pDnode, pMsg);
break;
default:
code = TSDB_CODE_MSG_NOT_PROCESSED;
break;
}
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) {
SRpcMsg *pRsp = NULL;
vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp);
......@@ -909,11 +865,6 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
return pVnode;
}
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
dndWriteRpcMsgToVnodeQueue(pMgmt->pMgmtQ, pMsg);
}
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) {
......@@ -957,35 +908,6 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMs
return code;
}
static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "vnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("vnode mgmt worker is initialized");
return 0;
}
static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
tWorkerCleanup(&pMgmt->mgmtPool);
pMgmt->pMgmtQ = NULL;
dDebug("vnode mgmt worker is closed");
}
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue);
......@@ -1167,11 +1089,6 @@ int32_t dndInitVnodes(SDnode *pDnode) {
return -1;
}
if (dndInitVnodeMgmtWorker(pDnode) != 0) {
dError("failed to init vnodes mgmt worker since %s", terrstr());
return -1;
}
if (dndOpenVnodes(pDnode) != 0) {
dError("failed to open vnodes since %s", terrstr());
return -1;
......@@ -1187,7 +1104,6 @@ void dndCleanupVnodes(SDnode *pDnode) {
dndCleanupVnodeReadWorker(pDnode);
dndCleanupVnodeWriteWorker(pDnode);
dndCleanupVnodeSyncWorker(pDnode);
dndCleanupVnodeMgmtWorker(pDnode);
dInfo("dnode-vnodes is cleaned up");
}
......
......@@ -28,14 +28,14 @@ Testbase DndTestCluster::test;
TEST_F(DndTestCluster, 01_ShowCluster) {
test.SendShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, "");
CHECK_META( "show cluster", 3);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id");
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BIGINT, 8, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
IgnoreInt32();
IgnoreInt64();
IgnoreBinary(TSDB_CLUSTER_ID_LEN);
CheckTimestamp();
}
\ No newline at end of file
......@@ -844,7 +844,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
}
mError("trans:%d, exec finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
return continueExec;
}
......
......@@ -123,6 +123,10 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
// remove attached object such as trans
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
if (deleteFp != NULL) (*deleteFp)(pSdb, pRow);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册