diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index 7e969baf37ab17b0faf3f1f8df3b1138ae675865..260d238c65293c902568ca145bcf94d7c8ef1781 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -24,8 +24,8 @@ extern "C" { int32_t mndInitMnode(); void mndCleanupMnode(); -void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect); -void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect); +void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect); +void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index 7d7f9d69d0ce6b0d7d732ab5aca60d1aa0020418..af091882a34959a427fead0a46e46fff921b8b08 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -24,9 +24,8 @@ extern "C" { int32_t mndInitSync(); void mndCleanupSync(); -int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData); - -bool mnodeIsMaster(); +bool mndIsMaster(); +int32_t mndSyncPropose(SSdbRaw *pRaw, void *pData); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index ce38049a36b37476ff77853dae8e685e355758f2..66df205f6eefd2a127d65cbade3b3e80cb0279be 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -20,5 +20,5 @@ int32_t mndInitMnode() { return 0; } void mndCleanupMnode() {} -void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {} -void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {} \ No newline at end of file +void mndGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {} +void mndGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 8db79e1e32fd13053ad9521b41b38ff6c729f195..89098e7ec047a85833227703aed336762926e0ea 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -21,10 +21,10 @@ int32_t mndInitSync() { return 0; } void mndCleanupSync() {} -int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) { +int32_t mndSyncPropose(SSdbRaw *pRaw, void *pData) { trnApply(pData, pData, 0); free(pData); return 0; } -bool mnodeIsMaster() { return true; } \ No newline at end of file +bool mndIsMaster() { return true; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index e9748ec12ec3af5d6436599ede8cf2f41d6912f3..f5f5464822f5533131c33ea446c9b2b8622b1d0f 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -36,9 +36,9 @@ static struct { char email[TSDB_FQDN_LEN]; } tsTelem; -static void mnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); } +static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); } -static void mnodeCloseObject(SBufferWriter* bw) { +static void mndCloseObject(SBufferWriter* bw) { size_t len = tbufTell(bw); if (tbufGetData(bw, false)[len - 1] == ',') { tbufWriteCharAt(bw, len - 1, '}'); @@ -64,14 +64,14 @@ static void closeArray(SBufferWriter* bw) { } #endif -static void mnodeWriteString(SBufferWriter* bw, const char* str) { +static void mndWriteString(SBufferWriter* bw, const char* str) { tbufWriteChar(bw, '"'); tbufWrite(bw, str, strlen(str)); tbufWriteChar(bw, '"'); } -static void mnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) { - mnodeWriteString(bw, k); +static void mndAddIntField(SBufferWriter* bw, const char* k, int64_t v) { + mndWriteString(bw, k); tbufWriteChar(bw, ':'); char buf[32]; sprintf(buf, "%" PRId64, v); @@ -79,14 +79,14 @@ static void mnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) { tbufWriteChar(bw, ','); } -static void mnodeAddStringField(SBufferWriter* bw, const char* k, const char* v) { - mnodeWriteString(bw, k); +static void mndAddStringField(SBufferWriter* bw, const char* k, const char* v) { + mndWriteString(bw, k); tbufWriteChar(bw, ':'); - mnodeWriteString(bw, v); + mndWriteString(bw, v); tbufWriteChar(bw, ','); } -static void mnodeAddCpuInfo(SBufferWriter* bw) { +static void mndAddCpuInfo(SBufferWriter* bw) { char* line = NULL; size_t size = 0; int32_t done = 0; @@ -100,11 +100,11 @@ static void mnodeAddCpuInfo(SBufferWriter* bw) { line[size - 1] = '\0'; if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) { const char* v = strchr(line, ':') + 2; - mnodeAddStringField(bw, "cpuModel", v); + mndAddStringField(bw, "cpuModel", v); done |= 1; } else if (((done & 2) == 0) && strncmp(line, "cpu cores", 9) == 0) { const char* v = strchr(line, ':') + 2; - mnodeWriteString(bw, "numOfCpu"); + mndWriteString(bw, "numOfCpu"); tbufWriteChar(bw, ':'); tbufWrite(bw, v, strlen(v)); tbufWriteChar(bw, ','); @@ -116,7 +116,7 @@ static void mnodeAddCpuInfo(SBufferWriter* bw) { fclose(fp); } -static void mnodeAddOsInfo(SBufferWriter* bw) { +static void mndAddOsInfo(SBufferWriter* bw) { char* line = NULL; size_t size = 0; @@ -133,7 +133,7 @@ static void mnodeAddOsInfo(SBufferWriter* bw) { p++; line[size - 2] = 0; } - mnodeAddStringField(bw, "os", p); + mndAddStringField(bw, "os", p); break; } } @@ -142,7 +142,7 @@ static void mnodeAddOsInfo(SBufferWriter* bw) { fclose(fp); } -static void mnodeAddMemoryInfo(SBufferWriter* bw) { +static void mndAddMemoryInfo(SBufferWriter* bw) { char* line = NULL; size_t size = 0; @@ -156,7 +156,7 @@ static void mnodeAddMemoryInfo(SBufferWriter* bw) { if (strncmp(line, "MemTotal", 8) == 0) { const char* p = strchr(line, ':') + 1; while (*p == ' ') p++; - mnodeAddStringField(bw, "memory", p); + mndAddStringField(bw, "memory", p); break; } } @@ -165,32 +165,32 @@ static void mnodeAddMemoryInfo(SBufferWriter* bw) { fclose(fp); } -static void mnodeAddVersionInfo(SBufferWriter* bw) { - mnodeAddStringField(bw, "version", version); - mnodeAddStringField(bw, "buildInfo", buildinfo); - mnodeAddStringField(bw, "gitInfo", gitinfo); - mnodeAddStringField(bw, "email", tsTelem.email); +static void mndAddVersionInfo(SBufferWriter* bw) { + mndAddStringField(bw, "version", version); + mndAddStringField(bw, "buildInfo", buildinfo); + mndAddStringField(bw, "gitInfo", gitinfo); + mndAddStringField(bw, "email", tsTelem.email); } -static void mnodeAddRuntimeInfo(SBufferWriter* bw) { +static void mndAddRuntimeInfo(SBufferWriter* bw) { SMnodeLoad load = {0}; if (mndGetLoad(NULL, &load) != 0) { return; } - mnodeAddIntField(bw, "numOfDnode", load.numOfDnode); - mnodeAddIntField(bw, "numOfMnode", load.numOfMnode); - mnodeAddIntField(bw, "numOfVgroup", load.numOfVgroup); - mnodeAddIntField(bw, "numOfDatabase", load.numOfDatabase); - mnodeAddIntField(bw, "numOfSuperTable", load.numOfSuperTable); - mnodeAddIntField(bw, "numOfChildTable", load.numOfChildTable); - mnodeAddIntField(bw, "numOfColumn", load.numOfColumn); - mnodeAddIntField(bw, "numOfPoint", load.totalPoints); - mnodeAddIntField(bw, "totalStorage", load.totalStorage); - mnodeAddIntField(bw, "compStorage", load.compStorage); + mndAddIntField(bw, "numOfDnode", load.numOfDnode); + mndAddIntField(bw, "numOfMnode", load.numOfMnode); + mndAddIntField(bw, "numOfVgroup", load.numOfVgroup); + mndAddIntField(bw, "numOfDatabase", load.numOfDatabase); + mndAddIntField(bw, "numOfSuperTable", load.numOfSuperTable); + mndAddIntField(bw, "numOfChildTable", load.numOfChildTable); + mndAddIntField(bw, "numOfColumn", load.numOfColumn); + mndAddIntField(bw, "numOfPoint", load.totalPoints); + mndAddIntField(bw, "totalStorage", load.totalStorage); + mndAddIntField(bw, "compStorage", load.compStorage); } -static void mnodeSendTelemetryReport() { +static void mndSendTelemetryReport() { char buf[128] = {0}; uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); if (ip == 0xffffffff) { @@ -208,15 +208,15 @@ static void mnodeSendTelemetryReport() { snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId); SBufferWriter bw = tbufInitWriter(NULL, false); - mnodeBeginObject(&bw); - mnodeAddStringField(&bw, "instanceId", clusterIdStr); - mnodeAddIntField(&bw, "reportVersion", 1); - mnodeAddOsInfo(&bw); - mnodeAddCpuInfo(&bw); - mnodeAddMemoryInfo(&bw); - mnodeAddVersionInfo(&bw); - mnodeAddRuntimeInfo(&bw); - mnodeCloseObject(&bw); + mndBeginObject(&bw); + mndAddStringField(&bw, "instanceId", clusterIdStr); + mndAddIntField(&bw, "reportVersion", 1); + mndAddOsInfo(&bw); + mndAddCpuInfo(&bw); + mndAddMemoryInfo(&bw); + mndAddVersionInfo(&bw); + mndAddRuntimeInfo(&bw); + mndCloseObject(&bw); const char* header = "POST /report HTTP/1.1\n" @@ -240,12 +240,12 @@ static void mnodeSendTelemetryReport() { taosCloseSocket(fd); } -static void* mnodeTelemThreadFp(void* param) { +static void* mndTelemThreadFp(void* param) { struct timespec end = {0}; clock_gettime(CLOCK_REALTIME, &end); end.tv_sec += 300; // wait 5 minutes before send first report - setThreadName("mnode-telem"); + setThreadName("mnd-telem"); while (!tsTelem.exit) { int32_t r = 0; @@ -256,8 +256,8 @@ static void* mnodeTelemThreadFp(void* param) { if (r == 0) break; if (r != ETIMEDOUT) continue; - if (mnodeIsMaster()) { - mnodeSendTelemetryReport(); + if (mndIsMaster()) { + mndSendTelemetryReport(); } end.tv_sec += REPORT_INTERVAL; } @@ -265,7 +265,7 @@ static void* mnodeTelemThreadFp(void* param) { return NULL; } -static void mnodeGetEmail(char* filepath) { +static void mndGetEmail(char* filepath) { int32_t fd = taosOpenFileRead(filepath); if (fd < 0) { return; @@ -287,19 +287,19 @@ int32_t mndInitTelem() { pthread_cond_init(&tsTelem.cond, NULL); tsTelem.email[0] = 0; - mnodeGetEmail("/usr/local/taos/email"); + mndGetEmail("/usr/local/taos/email"); pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - int32_t code = pthread_create(&tsTelem.thread, &attr, mnodeTelemThreadFp, NULL); + int32_t code = pthread_create(&tsTelem.thread, &attr, mndTelemThreadFp, NULL); pthread_attr_destroy(&attr); if (code != 0) { mTrace("failed to create telemetry thread since :%s", strerror(code)); } - mInfo("mnode telemetry is initialized"); + mInfo("mnd telemetry is initialized"); return 0; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index a51133d4276f8681d45dd156f94bb3ff69a8b3a2..32c2c459e5f6cf85db75a2e85c69d8bc678559b9 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -15,14 +15,12 @@ #define _DEFAULT_SOURCE #include "mndSync.h" -#include "os.h" -#include "tglobal.h" #include "tkey.h" #include "mndTrans.h" #define SDB_USER_VER 1 -static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { +static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj)); if (pRaw == NULL) return NULL; @@ -38,7 +36,7 @@ static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { return pRaw; } -static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) { +static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; @@ -62,7 +60,7 @@ static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) { return pRow; } -static int32_t mnodeUserActionInsert(SUserObj *pUser) { +static int32_t mndUserActionInsert(SUserObj *pUser) { pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pUser->prohibitDbHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -78,7 +76,7 @@ static int32_t mnodeUserActionInsert(SUserObj *pUser) { return 0; } -static int32_t mnodeUserActionDelete(SUserObj *pUser) { +static int32_t mndUserActionDelete(SUserObj *pUser) { if (pUser->prohibitDbHash) { taosHashCleanup(pUser->prohibitDbHash); pUser->prohibitDbHash = NULL; @@ -92,14 +90,14 @@ static int32_t mnodeUserActionDelete(SUserObj *pUser) { return 0; } -static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) { +static int32_t mndUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) { SUserObj tObj; int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj); memcpy(pDstUser, pSrcUser, len); return 0; } -static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { +static int32_t mndCreateDefaultUser(char *acct, char *user, char *pass) { SUserObj userObj = {0}; tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN); @@ -111,30 +109,26 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { userObj.rootAuth = 1; } - SSdbRaw *pRaw = mnodeUserActionEncode(&userObj); + SSdbRaw *pRaw = mndUserActionEncode(&userObj); if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); return sdbWrite(pRaw); } -static int32_t mnodeCreateDefaultUsers() { - if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { +static int32_t mndCreateDefaultUsers() { + if (mndCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { return -1; } - if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "monitor", tsInternalPass) != 0) { - return -1; - } - - if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, tsInternalPass) != 0) { + if (mndCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { return -1; } return 0; } -static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg) { +static int32_t mndCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pMsg) { SUserObj userObj = {0}; tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN); @@ -146,7 +140,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) return -1; - SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj); + SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj); if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) { mError("failed to append redo log since %s", terrstr()); trnDrop(pTrans); @@ -154,7 +148,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM } sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); - SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj); + SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj); if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) { mError("failed to append undo log since %s", terrstr()); trnDrop(pTrans); @@ -162,7 +156,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM } sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); - SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj); + SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj); if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) { mError("failed to append commit log since %s", terrstr()); trnDrop(pTrans); @@ -170,7 +164,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (trnPrepare(pTrans, mnodeSyncPropose) != 0) { + if (trnPrepare(pTrans, mndSyncPropose) != 0) { trnDrop(pTrans); return -1; } @@ -179,7 +173,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM return 0; } -static int32_t mnodeProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { +static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; if (pCreate->user[0] == 0) { @@ -209,7 +203,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return -1; } - int32_t code = mnodeCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg); + int32_t code = mndCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg); sdbRelease(pOperUser); if (code != 0) { @@ -223,15 +217,15 @@ static int32_t mnodeProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { int32_t mndInitUser() { SSdbTable table = {.sdbType = SDB_USER, .keyType = SDB_KEY_BINARY, - .deployFp = (SdbDeployFp)mnodeCreateDefaultUsers, - .encodeFp = (SdbEncodeFp)mnodeUserActionEncode, - .decodeFp = (SdbDecodeFp)mnodeUserActionDecode, - .insertFp = (SdbInsertFp)mnodeUserActionInsert, - .updateFp = (SdbUpdateFp)mnodeUserActionUpdate, - .deleteFp = (SdbDeleteFp)mnodeUserActionDelete}; + .deployFp = (SdbDeployFp)mndCreateDefaultUsers, + .encodeFp = (SdbEncodeFp)mndUserActionEncode, + .decodeFp = (SdbDecodeFp)mndUserActionDecode, + .insertFp = (SdbInsertFp)mndUserActionInsert, + .updateFp = (SdbUpdateFp)mndUserActionUpdate, + .deleteFp = (SdbDeleteFp)mndUserActionDelete}; sdbSetTable(table); - mndSetMsgHandle(NULL, TSDB_MSG_TYPE_CREATE_USER, mnodeProcessCreateUserMsg); + mndSetMsgHandle(NULL, TSDB_MSG_TYPE_CREATE_USER, mndProcessCreateUserMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 426c86f728031749786c97d24529135737470f27..369994c2bb2771f4782188054c889d629c9e1077 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -278,28 +278,44 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {} static void mndProcessRpcMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; + int32_t code = 0; + int32_t msgType = pMsg->rpcMsg.msgType; + void *ahandle = pMsg->rpcMsg.ahandle; + bool isReq = (msgType % 2 == 1); - if (!mnodeIsMaster(pMnode)) { - mndSendRedirectMsg(pMnode, &pMsg->rpcMsg); - mndCleanupMsg(pMsg); - return; + if (isReq && !mndIsMaster(pMnode)) { + code = TSDB_CODE_APP_NOT_READY; + goto PROCESS_RPC_END; + } + + if (isReq && pMsg->rpcMsg.pCont == NULL) { + mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MND_INVALID_MSG_LEN; + goto PROCESS_RPC_END; } - int32_t msgType = pMsg->rpcMsg.msgType; MndMsgFp fp = pMnode->msgFp[msgType]; if (fp == NULL) { - mError("RPC %p, req:%s is not processed", pMsg->rpcMsg.handle, taosMsg[msgType]); - SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; - rpcSendResponse(&rspMsg); - mndCleanupMsg(pMsg); - return; + mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); + code = TSDB_CODE_MSG_NOT_PROCESSED; + goto PROCESS_RPC_END; } - int32_t code = (*fp)(pMnode, pMsg); + code = (*fp)(pMnode, pMsg); if (code != 0) { - mError("RPC %p, req:%s processed error since %s", pMsg->rpcMsg.handle, taosMsg[msgType], tstrerror(code)); - SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; - rpcSendResponse(&rspMsg); + mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], tstrerror(code)); + goto PROCESS_RPC_END; + } + +PROCESS_RPC_END: + if (isReq) { + if (code == TSDB_CODE_APP_NOT_READY) { + mndSendRedirectMsg(pMnode, &pMsg->rpcMsg); + } else if (code != 0) { + SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = code}; + rpcSendResponse(&rspMsg); + } else { + } } mndCleanupMsg(pMsg); @@ -318,139 +334,3 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } void mndProcessApplyMsg(SMnodeMsg *pMsg) {} - -#if 0 - -static void mnodeProcessWriteReq(SMnodeMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; - int32_t code = 0; - - if (pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_INVALID_MSG_LEN; - goto PROCESS_WRITE_REQ_END; - } - - if (!mnodeIsMaster()) { - SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); - mnodeGetMnodeEpSetForShell(epSet, true); - rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SEpSet); - - mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, - taosMsg[msgType], epSet->numOfEps, epSet->inUse); - - code = TSDB_CODE_RPC_REDIRECT; - goto PROCESS_WRITE_REQ_END; - } - - if (tsMworker.writeMsgFp[msgType] == NULL) { - mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_MSG_NOT_PROCESSED; - goto PROCESS_WRITE_REQ_END; - } - - code = (*tsMworker.writeMsgFp[msgType])(pMsg); - -PROCESS_WRITE_REQ_END: - mndSendRsp(pMsg, code); -} - -static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; - int32_t code = 0; - - if (pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_INVALID_MSG_LEN; - goto PROCESS_READ_REQ_END; - } - - if (!mnodeIsMaster()) { - SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); - if (!epSet) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto PROCESS_READ_REQ_END; - } - mnodeGetMnodeEpSetForShell(epSet, true); - rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SEpSet); - - mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], - epSet->numOfEps, epSet->inUse); - code = TSDB_CODE_RPC_REDIRECT; - goto PROCESS_READ_REQ_END; - } - - if (tsMworker.readMsgFp[msgType] == NULL) { - mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_MSG_NOT_PROCESSED; - goto PROCESS_READ_REQ_END; - } - - mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]); - code = (*tsMworker.readMsgFp[msgType])(pMsg); - -PROCESS_READ_REQ_END: - mndSendRsp(pMsg, code); -} - -static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; - int32_t code = 0; - - if (pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_INVALID_MSG_LEN; - goto PROCESS_PEER_REQ_END; - } - - if (!mnodeIsMaster()) { - SMnodeRsp *rpcRsp = &pMsg->rpcRsp; - SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); - mnodeGetMnodeEpSetForPeer(epSet, true); - rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SEpSet); - - mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, - taosMsg[msgType], epSet->numOfEps, epSet->inUse); - - code = TSDB_CODE_RPC_REDIRECT; - goto PROCESS_PEER_REQ_END; - } - - if (tsMworker.peerReqFp[msgType] == NULL) { - mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]); - code = TSDB_CODE_MND_MSG_NOT_PROCESSED; - goto PROCESS_PEER_REQ_END; - } - - code = (*tsMworker.peerReqFp[msgType])(pMsg); - -PROCESS_PEER_REQ_END: - mndSendRsp(pMsg, code); -} - -static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) { - int32_t msgType = pMsg->rpcMsg.msgType; - SRpcMsg *pRpcMsg = &pMsg->rpcMsg; - - if (!mnodeIsMaster()) { - mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); - mndCleanupMsg2(pMsg); - } - - if (tsMworker.peerRspFp[msgType]) { - (*tsMworker.peerRspFp[msgType])(pRpcMsg); - } else { - mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]); - } - - mndCleanupMsg2(pMsg); -} -#endif \ No newline at end of file