diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 7b8afa96bbb8815b8fe26e35e3b81cb9366bfc3d..20941847d310a5ff9e03e2565352c4d89d614f02 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -470,7 +470,7 @@ static void *dnodeThreadRoutine(void *param) { pthread_testcancel(); if (dndGetStat(pDnode) == DND_STAT_RUNNING) { - // dndSendStatusMsg(pDnode); + dndSendStatusMsg(pDnode); } } } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 342d14463ab51de85017c129fd3b4d440c2fdcc9..1f02a3799b4fb362184c0ee1ca32d9360c24d4d3 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -184,7 +184,7 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { } code = 0; - dInfo("succcessed to read file %s", pMgmt->file); + dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); PRASE_MNODE_OVER: if (content != NULL) free(content); @@ -241,7 +241,7 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) { return -1; } - dInfo("successed to write %s", pMgmt->file); + dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); return 0; } @@ -396,29 +396,34 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t code = dndStartMnodeWorker(pDnode); - if (code != 0) { - dError("failed to start mnode worker since %s", terrstr()); - return code; - } - SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption); if (pMnode == NULL) { dError("failed to open mnode since %s", terrstr()); + return -1; + } + pMgmt->deployed = 1; + + int32_t code = dndWriteMnodeFile(pDnode); + if (code != 0) { + dError("failed to write mnode file since %s", terrstr()); code = terrno; - dndStopMnodeWorker(pDnode); + pMgmt->deployed = 0; + mndClose(pMnode); + mndDestroy(pDnode->dir.mnode); terrno = code; - return code; + return -1; } - if (dndWriteMnodeFile(pDnode) != 0) { - dError("failed to write mnode file since %s", terrstr()); + code = dndStartMnodeWorker(pDnode); + if (code != 0) { + dError("failed to start mnode worker since %s", terrstr()); code = terrno; + pMgmt->deployed = 0; dndStopMnodeWorker(pDnode); mndClose(pMnode); mndDestroy(pDnode->dir.mnode); terrno = code; - return code; + return -1; } taosWLockLatch(&pMgmt->latch); @@ -426,6 +431,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); + dInfo("mnode open successfully"); return 0; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ded66edaa7e3ada05f5bc7b1d3f61b7f9d05bf67..9a48d440dcfb7458aca8992e2239a35a239a577c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -299,25 +299,18 @@ typedef struct { char payload[]; } SShowObj; -typedef struct { - int32_t len; - void *rsp; -} SMnodeRsp; - typedef struct SMnodeMsg { + char user[TSDB_USER_LEN]; SMnode *pMnode; - void (*fp)(SMnodeMsg *pMsg, int32_t code); - SRpcConnInfo conn; - SUserObj *pUser; - int16_t received; - int16_t successed; - int16_t expected; - int16_t retry; - int32_t code; - int64_t createdTime; - SMnodeRsp rpcRsp; - SRpcMsg rpcMsg; - char pCont[]; + int16_t received; + int16_t successed; + int16_t expected; + int16_t retry; + int32_t code; + int64_t createdTime; + SRpcMsg rpcMsg; + int32_t contLen; + void *pCont; } SMnodeMsg; #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index ce570773bd82d0705ec875a0d9bfe377db12d21c..4d31a87b191761b7a027837d8b3ec9617faff1cf 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -22,8 +22,10 @@ extern "C" { #endif -int32_t mndInitUser(SMnode *pMnode); -void mndCleanupUser(SMnode *pMnode); +int32_t mndInitUser(SMnode *pMnode); +void mndCleanupUser(SMnode *pMnode); +SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName); +void mndReleaseUser(SMnode *pMnode, SUserObj *pUser); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 73b016742245d2d4259fc7974745e9f48f191da1..23328506e636e58b53a76ba2a7df277112952250 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -44,6 +44,7 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sver != SDB_ACCT_VER) { + mError("failed to decode acct since %s", terrstr()); terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } @@ -68,14 +69,26 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) { return pRow; } -static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { return 0; } +static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { + mTrace("acct:%s, perform insert action", pAcct->acct); + memset(&pAcct->info, 0, sizeof(SAcctInfo)); + return 0; +} -static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { return 0; } +static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { + mTrace("acct:%s, perform delete action", pAcct->acct); + return 0; +} static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) { - SAcctObj tObj; - int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj); - memcpy(pDstAcct, pSrcAcct, len); + mTrace("acct:%s, perform update action", pSrcAcct->acct); + + memcpy(pSrcAcct->acct, pDstAcct->acct, TSDB_USER_LEN); + pSrcAcct->createdTime = pDstAcct->createdTime; + pSrcAcct->updateTime = pDstAcct->updateTime; + pSrcAcct->acctId = pDstAcct->acctId; + pSrcAcct->status = pDstAcct->status; + pSrcAcct->cfg = pDstAcct->cfg; return 0; } @@ -98,6 +111,7 @@ static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); + mTrace("acct:%s, will be created while deploy sdb", acctObj.acct); return sdbWrite(pMnode->pSdb, pRaw); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 68d4bc36c6b9910f9ebc69174c756a489c32aa90..2d06ee5c2d41364e0a15c1e801e1772090114173 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -107,8 +107,8 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj pSrcDnode->createdTime = pDstDnode->createdTime; pSrcDnode->updateTime = pDstDnode->updateTime; pSrcDnode->port = pDstDnode->port; - memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); - mnodeResetDnode(pSrcDnode); + memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); + return 0; } static int32_t mndCreateDefaultDnode(SMnode *pMnode) { @@ -211,8 +211,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { return 0; } -static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { - SStatusMsg *pStatus = pMsg->rpcMsg.pCont; +static void mndParseStatusMsg(SStatusMsg *pStatus) { pStatus->sver = htonl(pStatus->sver); pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->clusterId = htobe64(pStatus->clusterId); @@ -225,6 +224,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); +} + +static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { + SStatusMsg *pStatus = pMsg->rpcMsg.pCont; + mndParseStatusMsg(pStatus); SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { @@ -305,8 +309,8 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { pRsp->dnodeCfg.clusterId = htobe64(clusterId); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); - pMsg->rpcRsp.len = contLen; - pMsg->rpcRsp.rsp = pRsp; + pMsg->contLen = contLen; + pMsg->pCont = pRsp; mndReleaseDnode(pMnode, pDnode); return 0; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 2863846e87f2c95888a1c61e86f06e2974aacea5..d2ace31a3661c9294fd57b1b9649d6d2cd05f0f5 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -63,6 +63,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) { pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id); if (pMnodeObj->pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + mError("mnode:%d, failed to perform insert action since %s", pMnodeObj->id, terrstr()); return -1; } @@ -85,12 +86,12 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj pSrcMnode->id = pDstMnode->id; pSrcMnode->createdTime = pDstMnode->createdTime; pSrcMnode->updateTime = pDstMnode->updateTime; - mnodeResetMnode(pSrcMnode); + return 0; } static int32_t mndCreateDefaultMnode(SMnode *pMnode) { SMnodeObj mnodeObj = {0}; - mnodeObj.id = 0; + mnodeObj.id = 1; mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 4156d2ab37676346232ae5fb547ad09fc0dd417f..abbe41a60d43c88d2a93a3eddaa52bfcc8230493 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -15,8 +15,8 @@ #define _DEFAULT_SOURCE #include "mndSync.h" -#include "tkey.h" #include "mndTrans.h" +#include "tkey.h" #define SDB_USER_VER 1 @@ -41,6 +41,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sver != SDB_USER_VER) { + mError("failed to decode user since %s", terrstr()); terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } @@ -61,15 +62,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { } static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) { + mTrace("user:%s, perform insert action", pUser->user); pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pUser->prohibitDbHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr()); return -1; } pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct); if (pUser->pAcct == NULL) { terrno = TSDB_CODE_MND_ACCT_NOT_EXIST; + mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr()); return -1; } @@ -77,12 +81,13 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) { } static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { + mTrace("user:%s, perform delete action", pUser->user); if (pUser->prohibitDbHash) { taosHashCleanup(pUser->prohibitDbHash); pUser->prohibitDbHash = NULL; } - if (pUser->acct != NULL) { + if (pUser->pAcct != NULL) { sdbRelease(pSdb, pUser->pAcct); pUser->pAcct = NULL; } @@ -91,9 +96,13 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { } static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) { - SUserObj tObj; - int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj); - memcpy(pDstUser, pSrcUser, len); + mTrace("user:%s, perform update action", pSrcUser->user); + memcpy(pSrcUser->user, pDstUser->user, TSDB_USER_LEN); + memcpy(pSrcUser->pass, pDstUser->pass, TSDB_KEY_LEN); + memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN); + pSrcUser->createdTime = pDstUser->createdTime; + pSrcUser->updateTime = pDstUser->updateTime; + pSrcUser->rootAuth = pDstUser->rootAuth; return 0; } @@ -113,6 +122,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); + mTrace("user:%s, will be created while deploy sdb", userObj.user); return sdbWrite(pMnode->pSdb, pRaw); } @@ -196,7 +206,7 @@ static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return -1; } - SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->conn.user); + SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; mError("user:%s, failed to create since %s", pCreate->user, terrstr()); @@ -229,4 +239,15 @@ int32_t mndInitUser(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupUser(SMnode *pMnode) {} \ No newline at end of file +void mndCleanupUser(SMnode *pMnode) {} + +SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName) { + SSdb *pSdb = pMnode->pSdb; + return sdbAcquire(pSdb, SDB_USER, &userName); +} + +void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pUser); +} + diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 8c206a724ae44815a98d4d78ea275ed5ca7425ad..f1b46098d70904c2b02d28518b21fc7c597f01ba 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -346,28 +346,30 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to create msg since %s", terrstr()); return NULL; } - if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) { + SRpcConnInfo connInfo = {0}; + if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { mndCleanupMsg(pMsg); - mError("can not get user from conn:%p", pMsg->rpcMsg.handle); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + mError("failed to create msg since %s",terrstr()); return NULL; } + memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); + pMsg->pMnode = pMnode; pMsg->rpcMsg = *pRpcMsg; pMsg->createdTime = taosGetTimestampSec(); + mTrace("msg:%p, is created", pMsg); return pMsg; } void mndCleanupMsg(SMnodeMsg *pMsg) { - if (pMsg->pUser != NULL) { - sdbRelease(pMsg->pMnode->pSdb, pMsg->pUser); - } - taosFreeQitem(pMsg); + mTrace("msg:%p, is destroyed", pMsg); } void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {} @@ -379,28 +381,31 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType % 2 == 1); + mTrace("msg:%p, type:%s will be processed", pMsg, taosMsg[msgType]); + if (isReq && !mndIsMaster(pMnode)) { code = TSDB_CODE_APP_NOT_READY; + mDebug("msg:%p, failed to process since %s", pMsg, terrstr()); goto PROCESS_RPC_END; } if (isReq && pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); code = TSDB_CODE_MND_INVALID_MSG_LEN; + mError("msg:%p, failed to process since %s", pMsg, terrstr()); goto PROCESS_RPC_END; } MndMsgFp fp = pMnode->msgFp[msgType]; if (fp == NULL) { - mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); code = TSDB_CODE_MSG_NOT_PROCESSED; + mError("msg:%p, failed to process since not handle", pMsg); goto PROCESS_RPC_END; } code = (*fp)(pMnode, pMsg); if (code != 0) { code = terrno; - mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], terrstr()); + mError("msg:%p, failed to process since %s", pMsg, terrstr()); goto PROCESS_RPC_END; } @@ -409,9 +414,11 @@ PROCESS_RPC_END: if (code == TSDB_CODE_APP_NOT_READY) { mndSendRedirectMsg(pMnode, &pMsg->rpcMsg); } else if (code != 0) { - SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = code}; - rpcSendResponse(&rspMsg); + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; + rpcSendResponse(&rpcRsp); } else { + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; + rpcSendResponse(&rpcRsp); } }