提交 6160aa6e 编写于 作者: S Shengliang Guan

mgmt queue

上级 bb3a0b21
...@@ -23,15 +23,16 @@ extern "C" { ...@@ -23,15 +23,16 @@ extern "C" {
int32_t dndInitDnode(SDnode *pDnode); int32_t dndInitDnode(SDnode *pDnode);
void dndCleanupDnode(SDnode *pDnode); void dndCleanupDnode(SDnode *pDnode);
void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnode); int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode); int64_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
void dndSendStatusMsg(SDnode *pDnode); void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
void dndSendStatusMsg(SDnode *pDnode);
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -59,18 +59,20 @@ typedef struct { ...@@ -59,18 +59,20 @@ typedef struct {
} SDnodeDir; } SDnodeDir;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t dropped; int32_t dropped;
int64_t clusterId; int64_t clusterId;
int64_t rebootTime; int64_t rebootTime;
int64_t updateTime; int64_t updateTime;
int8_t statusSent; int8_t statusSent;
SEpSet mnodeEpSet; SEpSet mnodeEpSet;
char *file; char *file;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SDnodeEps *dnodeEps; SDnodeEps *dnodeEps;
pthread_t *threadId; pthread_t *threadId;
SRWLatch latch; SRWLatch latch;
taos_queue pMgmtQ;
SWorkerPool mgmtPool;
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
...@@ -86,8 +88,6 @@ typedef struct { ...@@ -86,8 +88,6 @@ typedef struct {
taos_queue pReadQ; taos_queue pReadQ;
taos_queue pWriteQ; taos_queue pWriteQ;
taos_queue pSyncQ; taos_queue pSyncQ;
taos_queue pMgmtQ;
SWorkerPool mgmtPool;
SWorkerPool readPool; SWorkerPool readPool;
SWorkerPool writePool; SWorkerPool writePool;
SWorkerPool syncPool; SWorkerPool syncPool;
......
...@@ -23,11 +23,14 @@ extern "C" { ...@@ -23,11 +23,14 @@ extern "C" {
int32_t dndInitMnode(SDnode *pDnode); int32_t dndInitMnode(SDnode *pDnode);
void dndCleanupMnode(SDnode *pDnode); void dndCleanupMnode(SDnode *pDnode);
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -18,6 +18,21 @@ ...@@ -18,6 +18,21 @@
#include "dndTransport.h" #include "dndTransport.h"
#include "dndVnodes.h" #include "dndVnodes.h"
static int32_t dndInitMgmtWorker(SDnode *pDnode);
static void dndCleanupMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMgmtQueue(SDnode *pDnode);
static void dndFreeMgmtQueue(SDnode *pDnode);
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndReadDnodes(SDnode *pDnode);
static int32_t dndWriteDnodes(SDnode *pDnode);
static void *dnodeThreadRoutine(void *param);
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg);
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg);
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg);
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg);
int32_t dndGetDnodeId(SDnode *pDnode) { int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
...@@ -164,7 +179,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) { ...@@ -164,7 +179,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 256 *1024; int32_t maxLen = 256 * 1024;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
cJSON *root = NULL; cJSON *root = NULL;
FILE *fp = NULL; FILE *fp = NULL;
...@@ -302,7 +317,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { ...@@ -302,7 +317,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
} }
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 256 *1024; int32_t maxLen = 256 * 1024;
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
...@@ -409,13 +424,9 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { ...@@ -409,13 +424,9 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
taosWUnLockLatch(&pMgmt->latch); taosWUnLockLatch(&pMgmt->latch);
} }
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0) {
dndUpdateMnodeEpSet(pDnode, pEpSet);
}
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
return; return;
...@@ -443,9 +454,9 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -443,9 +454,9 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
} }
static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); }
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); }
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
dError("config msg is received, but not supported yet"); dError("config msg is received, but not supported yet");
...@@ -456,7 +467,7 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -456,7 +467,7 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
} }
static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
dDebug("startup msg is received"); dDebug("startup msg is received");
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
...@@ -490,6 +501,7 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -490,6 +501,7 @@ int32_t dndInitDnode(SDnode *pDnode) {
pMgmt->rebootTime = taosGetTimestampMs(); pMgmt->rebootTime = taosGetTimestampMs();
pMgmt->dropped = 0; pMgmt->dropped = 0;
pMgmt->clusterId = 0; pMgmt->clusterId = 0;
taosInitRWLatch(&pMgmt->latch);
char path[PATH_MAX]; char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode); snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode);
...@@ -511,7 +523,15 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -511,7 +523,15 @@ int32_t dndInitDnode(SDnode *pDnode) {
return -1; return -1;
} }
taosInitRWLatch(&pMgmt->latch); if (dndInitMgmtWorker(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndAllocMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
if (pMgmt->threadId == NULL) { if (pMgmt->threadId == NULL) {
...@@ -527,6 +547,9 @@ int32_t dndInitDnode(SDnode *pDnode) { ...@@ -527,6 +547,9 @@ int32_t dndInitDnode(SDnode *pDnode) {
void dndCleanupDnode(SDnode *pDnode) { void dndCleanupDnode(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupMgmtWorker(pDnode);
dndFreeMgmtQueue(pDnode);
if (pMgmt->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL; pMgmt->threadId = NULL;
...@@ -553,39 +576,105 @@ void dndCleanupDnode(SDnode *pDnode) { ...@@ -553,39 +576,105 @@ void dndCleanupDnode(SDnode *pDnode) {
dInfo("dnode-dnode is cleaned up"); dInfo("dnode-dnode is cleaned up");
} }
void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static int32_t dndInitMgmtWorker(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "dnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("dnode mgmt worker is initialized");
return 0;
}
static void dndCleanupMgmtWorker(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("dnode mgmt worker is closed");
}
static int32_t dndAllocMgmtQueue(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMgmtQueue(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
pMgmt->pMgmtQ = NULL;
}
void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pEpSet && pEpSet->numOfEps > 0 && pRpcMsg->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pDnode, pEpSet);
}
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg != NULL) *pMsg = *pRpcMsg;
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE:
code = dndProcessCreateMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_MNODE:
code = dndProcessAlterMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_MNODE:
code = dndProcessDropMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_NETWORK_TEST: case TDMT_DND_NETWORK_TEST:
dndProcessStartupReq(pDnode, pMsg); dndProcessStartupReq(pDnode, pMsg);
break; break;
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:
dndProcessConfigDnodeReq(pDnode, pMsg); dndProcessConfigDnodeReq(pDnode, pMsg);
break; break;
default:
dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
switch (pMsg->msgType) {
case TDMT_MND_STATUS_RSP: case TDMT_MND_STATUS_RSP:
dndProcessStatusRsp(pDnode, pMsg, pEpSet); dndProcessStatusRsp(pDnode, pMsg);
break; break;
case TDMT_MND_AUTH_RSP: case TDMT_MND_AUTH_RSP:
dndProcessAuthRsp(pDnode, pMsg, pEpSet); dndProcessAuthRsp(pDnode, pMsg);
break; break;
case TDMT_MND_GRANT_RSP: case TDMT_MND_GRANT_RSP:
dndProcessGrantRsp(pDnode, pMsg, pEpSet); dndProcessGrantRsp(pDnode, pMsg);
break; break;
default: default:
dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType));
break;
}
if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle};
rpcSendResponse(&rsp);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
taosFreeQitem(pMsg);
} }
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
static int32_t dndInitMnodeReadWorker(SDnode *pDnode); static int32_t dndInitMnodeReadWorker(SDnode *pDnode);
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode); static int32_t dndInitMnodeWriteWorker(SDnode *pDnode);
static int32_t dndInitMnodeSyncWorker(SDnode *pDnode); static int32_t dndInitMnodeSyncWorker(SDnode *pDnode);
static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode);
static void dndCleanupMnodeReadWorker(SDnode *pDnode); static void dndCleanupMnodeReadWorker(SDnode *pDnode);
static void dndCleanupMnodeWriteWorker(SDnode *pDnode); static void dndCleanupMnodeWriteWorker(SDnode *pDnode);
static void dndCleanupMnodeSyncWorker(SDnode *pDnode); static void dndCleanupMnodeSyncWorker(SDnode *pDnode);
...@@ -29,7 +28,6 @@ static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); ...@@ -29,7 +28,6 @@ static void dndCleanupMnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); static int32_t dndAllocMnodeReadQueue(SDnode *pDnode);
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode);
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode);
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode);
static void dndFreeMnodeReadQueue(SDnode *pDnode); static void dndFreeMnodeReadQueue(SDnode *pDnode);
static void dndFreeMnodeWriteQueue(SDnode *pDnode); static void dndFreeMnodeWriteQueue(SDnode *pDnode);
static void dndFreeMnodeSyncQueue(SDnode *pDnode); static void dndFreeMnodeSyncQueue(SDnode *pDnode);
...@@ -38,12 +36,10 @@ static void dndFreeMnodeMgmtQueue(SDnode *pDnode); ...@@ -38,12 +36,10 @@ static void dndFreeMnodeMgmtQueue(SDnode *pDnode);
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndStartMnodeWorker(SDnode *pDnode); static int32_t dndStartMnodeWorker(SDnode *pDnode);
static void dndStopMnodeWorker(SDnode *pDnode); static void dndStopMnodeWorker(SDnode *pDnode);
...@@ -58,10 +54,6 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption); ...@@ -58,10 +54,6 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption); static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndDropMnode(SDnode *pDnode); static int32_t dndDropMnode(SDnode *pDnode);
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
static SMnode *dndAcquireMnode(SDnode *pDnode) { static SMnode *dndAcquireMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = NULL; SMnode *pMnode = NULL;
...@@ -488,7 +480,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { ...@@ -488,7 +480,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
return pMsg; return pMsg;
} }
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
...@@ -504,7 +496,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -504,7 +496,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
} }
} }
static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
...@@ -524,7 +516,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -524,7 +516,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
return dndWriteMnodeFile(pDnode); return dndWriteMnodeFile(pDnode);
} }
static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDropMnodeInMsg *pMsg = pRpcMsg->pCont; SDropMnodeInMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId); pMsg->dnodeId = htonl(pMsg->dnodeId);
...@@ -536,33 +528,6 @@ static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { ...@@ -536,33 +528,6 @@ static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
} }
} }
static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) {
case TDMT_DND_CREATE_MNODE:
code = dndProcessCreateMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_ALTER_MNODE:
code = dndProcessAlterMnodeReq(pDnode, pMsg);
break;
case TDMT_DND_DROP_MNODE:
code = dndProcessDropMnodeReq(pDnode, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
break;
}
if (pMsg->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
...@@ -622,25 +587,6 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs ...@@ -622,25 +587,6 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs
return 0; return 0;
} }
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg != NULL) *pMsg = *pRpcMsg;
if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont);
taosFreeQitem(pMsg);
}
dndReleaseMnode(pDnode, pMnode);
}
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
...@@ -686,42 +632,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -686,42 +632,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} }
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
pMgmt->pMgmtQ = NULL;
}
static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->mgmtPool;
pPool->name = "mnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
dDebug("mnode mgmt worker is initialized");
return 0;
}
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("mnode mgmt worker is closed");
}
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
...@@ -842,16 +752,6 @@ int32_t dndInitMnode(SDnode *pDnode) { ...@@ -842,16 +752,6 @@ int32_t dndInitMnode(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosInitRWLatch(&pMgmt->latch); taosInitRWLatch(&pMgmt->latch);
if (dndInitMnodeMgmtWorker(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (dndAllocMnodeMgmtQueue(pDnode) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
char path[PATH_MAX]; char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode);
pMgmt->file = strdup(path); pMgmt->file = strdup(path);
...@@ -894,8 +794,6 @@ void dndCleanupMnode(SDnode *pDnode) { ...@@ -894,8 +794,6 @@ void dndCleanupMnode(SDnode *pDnode) {
dInfo("dnode-mnode start to clean up"); dInfo("dnode-mnode start to clean up");
if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); if (pMgmt->pMnode) dndStopMnodeWorker(pDnode);
dndCleanupMnodeMgmtWorker(pDnode);
dndFreeMnodeMgmtQueue(pDnode);
tfree(pMgmt->file); tfree(pMgmt->file);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
dInfo("dnode-mnode is cleaned up"); dInfo("dnode-mnode is cleaned up");
......
...@@ -83,7 +83,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -83,7 +83,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
// message from client to dnode // message from client to dnode
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessDnodeReq; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg;
// message from mnode to vnode // message from mnode to vnode
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg;
...@@ -106,24 +106,24 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -106,24 +106,24 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessDnodeReq; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg;
// message from dnode to mnode // message from dnode to mnode
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessDnodeRsp; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg;
} }
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
...@@ -191,7 +191,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -191,7 +191,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
tmsg_t msgType = pMsg->msgType; tmsg_t msgType = pMsg->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) { if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code);
dndProcessDnodeReq(pDnode, pMsg, pEpSet); dndProcessStartupReq(pDnode, pMsg);
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册