提交 c4306958 编写于 作者: S Shengliang Guan

TD-11265 fix transport in dnode-mgmt

上级 c3a25985
...@@ -137,38 +137,34 @@ void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code); ...@@ -137,38 +137,34 @@ void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code);
/** /**
* @brief Process the read request * @brief Process the read request
* *
* @param pMnode The mnode object
* @param pMsg The request msg * @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure * @return int32_t 0 for success, -1 for failure
*/ */
void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg); void mnodeProcessReadMsg(SMnodeMsg *pMsg);
/** /**
* @brief Process the write request * @brief Process the write request
* *
* @param pMnode The mnode object
* @param pMsg The request msg * @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure * @return int32_t 0 for success, -1 for failure
*/ */
void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg); void mnodeProcessWriteMsg(SMnodeMsg *pMsg);
/** /**
* @brief Process the sync request * @brief Process the sync request
* *
* @param pMnode The mnode object
* @param pMsg The request msg * @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure * @return int32_t 0 for success, -1 for failure
*/ */
void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg); void mnodeProcessSyncMsg(SMnodeMsg *pMsg);
/** /**
* @brief Process the apply request * @brief Process the apply request
* *
* @param pMnode The mnode object
* @param pMsg The request msg * @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure * @return int32_t 0 for success, -1 for failure
*/ */
void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg); void mnodeProcessApplyMsg(SMnodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -121,12 +121,14 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { ...@@ -121,12 +121,14 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
} }
pMgmt->mnodeEpSet.inUse = 0; pMgmt->mnodeEpSet.inUse = 0;
pMgmt->mnodeEpSet.numOfEps = 0;
int32_t mIndex = 0; int32_t mIndex = 0;
for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) {
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
if (!pDnodeEp->isMnode) continue; if (!pDnodeEp->isMnode) continue;
if (mIndex >= TSDB_MAX_REPLICA) continue; if (mIndex >= TSDB_MAX_REPLICA) continue;
pMgmt->mnodeEpSet.numOfEps++;
strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn); strcpy(pMgmt->mnodeEpSet.fqdn[mIndex], pDnodeEp->fqdn);
pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port; pMgmt->mnodeEpSet.port[mIndex] = pDnodeEp->port;
mIndex++; mIndex++;
...@@ -279,6 +281,7 @@ PRASE_DNODE_OVER: ...@@ -279,6 +281,7 @@ PRASE_DNODE_OVER:
if (pMgmt->dnodeEps == NULL) { if (pMgmt->dnodeEps == NULL) {
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
pMgmt->dnodeEps->num = 1; pMgmt->dnodeEps->num = 1;
pMgmt->dnodeEps->eps[0].isMnode = 1;
pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort; pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort;
tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
} }
......
...@@ -80,7 +80,9 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { ...@@ -80,7 +80,9 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) {
} }
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dTrace("acquire mnode, refCount:%d", refCount); if (pMnode != NULL) {
dTrace("acquire mnode, refCount:%d", refCount);
}
return pMnode; return pMnode;
} }
...@@ -94,7 +96,9 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { ...@@ -94,7 +96,9 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
} }
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dTrace("release mnode, refCount:%d", refCount); if (pMnode != NULL) {
dTrace("release mnode, refCount:%d", refCount);
}
} }
static int32_t dndReadMnodeFile(SDnode *pDnode) { static int32_t dndReadMnodeFile(SDnode *pDnode) {
...@@ -550,7 +554,7 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -550,7 +554,7 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
mnodeProcessReadMsg(pMnode, pMsg); mnodeProcessReadMsg(pMsg);
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} else { } else {
mnodeSendRsp(pMsg, terrno); mnodeSendRsp(pMsg, terrno);
...@@ -564,7 +568,7 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -564,7 +568,7 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
mnodeProcessWriteMsg(pMnode, pMsg); mnodeProcessWriteMsg(pMsg);
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} else { } else {
mnodeSendRsp(pMsg, terrno); mnodeSendRsp(pMsg, terrno);
...@@ -578,7 +582,7 @@ static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -578,7 +582,7 @@ static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
mnodeProcessApplyMsg(pMnode, pMsg); mnodeProcessApplyMsg(pMsg);
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} else { } else {
mnodeSendRsp(pMsg, terrno); mnodeSendRsp(pMsg, terrno);
...@@ -592,7 +596,7 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -592,7 +596,7 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
mnodeProcessSyncMsg(pMnode, pMsg); mnodeProcessSyncMsg(pMsg);
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} else { } else {
mnodeSendRsp(pMsg, terrno); mnodeSendRsp(pMsg, terrno);
...@@ -683,7 +687,7 @@ static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -683,7 +687,7 @@ static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, NULL, (FProcessItem)dndProcessMnodeMgmtQueue); pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue);
if (pMgmt->pMgmtQ == NULL) { if (pMgmt->pMgmtQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -708,18 +712,19 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { ...@@ -708,18 +712,19 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
return -1; return -1;
} }
dDebug("mnode mgmt worker is initialized");
return 0; return 0;
} }
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->mgmtPool); tWorkerCleanup(&pMgmt->mgmtPool);
dDebug("mnode mgmt worker is stopped"); dDebug("mnode mgmt worker is closed");
} }
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, NULL, (FProcessItem)dndProcessMnodeReadQueue); pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue);
if (pMgmt->pReadQ == NULL) { if (pMgmt->pReadQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -745,18 +750,19 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { ...@@ -745,18 +750,19 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) {
return -1; return -1;
} }
dDebug("mnode read worker is initialized");
return 0; return 0;
} }
static void dndCleanupMnodeReadWorker(SDnode *pDnode) { static void dndCleanupMnodeReadWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->readPool); tWorkerCleanup(&pMgmt->readPool);
dDebug("mnode read worker is stopped"); dDebug("mnode read worker is closed");
} }
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeWriteQueue); pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue);
if (pMgmt->pWriteQ == NULL) { if (pMgmt->pWriteQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -773,7 +779,7 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) { ...@@ -773,7 +779,7 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) { static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, NULL, (FProcessItem)dndProcessMnodeApplyQueue); pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeApplyQueue);
if (pMgmt->pApplyQ == NULL) { if (pMgmt->pApplyQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -799,18 +805,19 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { ...@@ -799,18 +805,19 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
return -1; return -1;
} }
dDebug("mnode write worker is initialized");
return 0; return 0;
} }
static void dndCleanupMnodeWriteWorker(SDnode *pDnode) { static void dndCleanupMnodeWriteWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->writePool); tWorkerCleanup(&pMgmt->writePool);
dDebug("mnode write worker is stopped"); dDebug("mnode write worker is closed");
} }
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, NULL, (FProcessItem)dndProcessMnodeSyncQueue); pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue);
if (pMgmt->pSyncQ == NULL) { if (pMgmt->pSyncQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -836,13 +843,14 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { ...@@ -836,13 +843,14 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
return -1; return -1;
} }
dDebug("mnode sync worker is initialized");
return 0; return 0;
} }
static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { static void dndCleanupMnodeSyncWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerCleanup(&pMgmt->syncPool); tWorkerCleanup(&pMgmt->syncPool);
dDebug("mnode sync worker is stopped"); dDebug("mnode sync worker is closed");
} }
int32_t dndInitMnode(SDnode *pDnode) { int32_t dndInitMnode(SDnode *pDnode) {
......
...@@ -261,12 +261,12 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -261,12 +261,12 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SDnode *pDnode = parent; SDnode *pDnode = parent;
if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) { if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) {
dTrace("get internal auth success"); // dTrace("get internal auth success");
return 0; return 0;
} }
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) { if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) {
dTrace("get auth from internal mnode"); // dTrace("get auth from internal mnode");
return 0; return 0;
} }
...@@ -275,7 +275,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -275,7 +275,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return -1; return -1;
} }
dDebug("user:%s, send auth msg to other mnodes", user); // dDebug("user:%s, send auth msg to other mnodes", user);
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, TSDB_USER_LEN); tstrncpy(pMsg->user, user, TSDB_USER_LEN);
......
...@@ -234,13 +234,13 @@ void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) { ...@@ -234,13 +234,13 @@ void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) {
} }
} }
void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } void mnodeProcessReadMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } void mnodeProcessWriteMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); } void mnodeProcessSyncMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg) {} void mnodeProcessApplyMsg(SMnodeMsg *pMsg) {}
#if 0 #if 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册