提交 d27efd6a 编写于 作者: S Shengliang Guan

TD-10431 process status msg

上级 f8fa8565
......@@ -470,7 +470,7 @@ static void *dnodeThreadRoutine(void *param) {
pthread_testcancel();
if (dndGetStat(pDnode) == DND_STAT_RUNNING) {
// dndSendStatusMsg(pDnode);
dndSendStatusMsg(pDnode);
}
}
}
......
......@@ -184,7 +184,7 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
}
code = 0;
dInfo("succcessed to read file %s", pMgmt->file);
dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
PRASE_MNODE_OVER:
if (content != NULL) free(content);
......@@ -241,7 +241,7 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) {
return -1;
}
dInfo("successed to write %s", pMgmt->file);
dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
return 0;
}
......@@ -396,29 +396,34 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = dndStartMnodeWorker(pDnode);
if (code != 0) {
dError("failed to start mnode worker since %s", terrstr());
return code;
}
SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
return -1;
}
pMgmt->deployed = 1;
int32_t code = dndWriteMnodeFile(pDnode);
if (code != 0) {
dError("failed to write mnode file since %s", terrstr());
code = terrno;
dndStopMnodeWorker(pDnode);
pMgmt->deployed = 0;
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
terrno = code;
return code;
return -1;
}
if (dndWriteMnodeFile(pDnode) != 0) {
dError("failed to write mnode file since %s", terrstr());
code = dndStartMnodeWorker(pDnode);
if (code != 0) {
dError("failed to start mnode worker since %s", terrstr());
code = terrno;
pMgmt->deployed = 0;
dndStopMnodeWorker(pDnode);
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
terrno = code;
return code;
return -1;
}
taosWLockLatch(&pMgmt->latch);
......@@ -426,6 +431,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch);
dInfo("mnode open successfully");
return 0;
}
......
......@@ -299,25 +299,18 @@ typedef struct {
char payload[];
} SShowObj;
typedef struct {
int32_t len;
void *rsp;
} SMnodeRsp;
typedef struct SMnodeMsg {
char user[TSDB_USER_LEN];
SMnode *pMnode;
void (*fp)(SMnodeMsg *pMsg, int32_t code);
SRpcConnInfo conn;
SUserObj *pUser;
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime;
SMnodeRsp rpcRsp;
SRpcMsg rpcMsg;
char pCont[];
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime;
SRpcMsg rpcMsg;
int32_t contLen;
void *pCont;
} SMnodeMsg;
#ifdef __cplusplus
......
......@@ -22,8 +22,10 @@
extern "C" {
#endif
int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode);
int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode);
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName);
void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
#ifdef __cplusplus
}
......
......@@ -44,6 +44,7 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) {
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_ACCT_VER) {
mError("failed to decode acct since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
}
......@@ -68,14 +69,26 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) {
return pRow;
}
static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { return 0; }
static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) {
mTrace("acct:%s, perform insert action", pAcct->acct);
memset(&pAcct->info, 0, sizeof(SAcctInfo));
return 0;
}
static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { return 0; }
static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
mTrace("acct:%s, perform delete action", pAcct->acct);
return 0;
}
static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) {
SAcctObj tObj;
int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj);
memcpy(pDstAcct, pSrcAcct, len);
mTrace("acct:%s, perform update action", pSrcAcct->acct);
memcpy(pSrcAcct->acct, pDstAcct->acct, TSDB_USER_LEN);
pSrcAcct->createdTime = pDstAcct->createdTime;
pSrcAcct->updateTime = pDstAcct->updateTime;
pSrcAcct->acctId = pDstAcct->acctId;
pSrcAcct->status = pDstAcct->status;
pSrcAcct->cfg = pDstAcct->cfg;
return 0;
}
......@@ -98,6 +111,7 @@ static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("acct:%s, will be created while deploy sdb", acctObj.acct);
return sdbWrite(pMnode->pSdb, pRaw);
}
......
......@@ -107,8 +107,8 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj
pSrcDnode->createdTime = pDstDnode->createdTime;
pSrcDnode->updateTime = pDstDnode->updateTime;
pSrcDnode->port = pDstDnode->port;
memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN);
mnodeResetDnode(pSrcDnode);
memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN);
return 0;
}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
......@@ -211,8 +211,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
return 0;
}
static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->sver = htonl(pStatus->sver);
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htobe64(pStatus->clusterId);
......@@ -225,6 +224,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
}
static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
mndParseStatusMsg(pStatus);
SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) {
......@@ -305,8 +309,8 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
pRsp->dnodeCfg.clusterId = htobe64(clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->rpcRsp.len = contLen;
pMsg->rpcRsp.rsp = pRsp;
pMsg->contLen = contLen;
pMsg->pCont = pRsp;
mndReleaseDnode(pMnode, pDnode);
return 0;
......
......@@ -63,6 +63,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) {
pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id);
if (pMnodeObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("mnode:%d, failed to perform insert action since %s", pMnodeObj->id, terrstr());
return -1;
}
......@@ -85,12 +86,12 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj
pSrcMnode->id = pDstMnode->id;
pSrcMnode->createdTime = pDstMnode->createdTime;
pSrcMnode->updateTime = pDstMnode->updateTime;
mnodeResetMnode(pSrcMnode);
return 0;
}
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
SMnodeObj mnodeObj = {0};
mnodeObj.id = 0;
mnodeObj.id = 1;
mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime;
......
......@@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE
#include "mndSync.h"
#include "tkey.h"
#include "mndTrans.h"
#include "tkey.h"
#define SDB_USER_VER 1
......@@ -41,6 +41,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_USER_VER) {
mError("failed to decode user since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
}
......@@ -61,15 +62,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
}
static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform insert action", pUser->user);
pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->prohibitDbHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr());
return -1;
}
pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct);
if (pUser->pAcct == NULL) {
terrno = TSDB_CODE_MND_ACCT_NOT_EXIST;
mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr());
return -1;
}
......@@ -77,12 +81,13 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
}
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform delete action", pUser->user);
if (pUser->prohibitDbHash) {
taosHashCleanup(pUser->prohibitDbHash);
pUser->prohibitDbHash = NULL;
}
if (pUser->acct != NULL) {
if (pUser->pAcct != NULL) {
sdbRelease(pSdb, pUser->pAcct);
pUser->pAcct = NULL;
}
......@@ -91,9 +96,13 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
}
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) {
SUserObj tObj;
int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj);
memcpy(pDstUser, pSrcUser, len);
mTrace("user:%s, perform update action", pSrcUser->user);
memcpy(pSrcUser->user, pDstUser->user, TSDB_USER_LEN);
memcpy(pSrcUser->pass, pDstUser->pass, TSDB_KEY_LEN);
memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN);
pSrcUser->createdTime = pDstUser->createdTime;
pSrcUser->updateTime = pDstUser->updateTime;
pSrcUser->rootAuth = pDstUser->rootAuth;
return 0;
}
......@@ -113,6 +122,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("user:%s, will be created while deploy sdb", userObj.user);
return sdbWrite(pMnode->pSdb, pRaw);
}
......@@ -196,7 +206,7 @@ static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return -1;
}
SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->conn.user);
SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->user);
if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
......@@ -229,4 +239,15 @@ int32_t mndInitUser(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupUser(SMnode *pMnode) {}
\ No newline at end of file
void mndCleanupUser(SMnode *pMnode) {}
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_USER, &userName);
}
void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pUser);
}
......@@ -346,28 +346,30 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create msg since %s", terrstr());
return NULL;
}
if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
SRpcConnInfo connInfo = {0};
if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
mndCleanupMsg(pMsg);
mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("failed to create msg since %s",terrstr());
return NULL;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->pMnode = pMnode;
pMsg->rpcMsg = *pRpcMsg;
pMsg->createdTime = taosGetTimestampSec();
mTrace("msg:%p, is created", pMsg);
return pMsg;
}
void mndCleanupMsg(SMnodeMsg *pMsg) {
if (pMsg->pUser != NULL) {
sdbRelease(pMsg->pMnode->pSdb, pMsg->pUser);
}
taosFreeQitem(pMsg);
mTrace("msg:%p, is destroyed", pMsg);
}
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {}
......@@ -379,28 +381,31 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType % 2 == 1);
mTrace("msg:%p, type:%s will be processed", pMsg, taosMsg[msgType]);
if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, failed to process since %s", pMsg, terrstr());
goto PROCESS_RPC_END;
}
if (isReq && pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_INVALID_MSG_LEN;
mError("msg:%p, failed to process since %s", pMsg, terrstr());
goto PROCESS_RPC_END;
}
MndMsgFp fp = pMnode->msgFp[msgType];
if (fp == NULL) {
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, failed to process since not handle", pMsg);
goto PROCESS_RPC_END;
}
code = (*fp)(pMnode, pMsg);
if (code != 0) {
code = terrno;
mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], terrstr());
mError("msg:%p, failed to process since %s", pMsg, terrstr());
goto PROCESS_RPC_END;
}
......@@ -409,9 +414,11 @@ PROCESS_RPC_END:
if (code == TSDB_CODE_APP_NOT_READY) {
mndSendRedirectMsg(pMnode, &pMsg->rpcMsg);
} else if (code != 0) {
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rspMsg);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp);
} else {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
rpcSendResponse(&rpcRsp);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册