From 5c889fff5481b9433ede1af2b05696d2e412612f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 May 2020 02:13:38 +0000 Subject: [PATCH] [TD-335] refact SMnodeMsg --- src/inc/mnode.h | 11 ++----- src/mnode/src/mnodeDb.c | 11 ++++--- src/mnode/src/mnodeDnode.c | 9 +++--- src/mnode/src/mnodeInt.c | 9 ++---- src/mnode/src/mnodePeer.c | 14 ++++----- src/mnode/src/mnodeRead.c | 15 +++++----- src/mnode/src/mnodeShow.c | 16 +++++----- src/mnode/src/mnodeTable.c | 59 ++++++++++++++++++------------------- src/mnode/src/mnodeUser.c | 8 ++--- src/mnode/src/mnodeVgroup.c | 12 ++++---- src/mnode/src/mnodeWrite.c | 17 ++++++----- 11 files changed, 87 insertions(+), 94 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index da0a899f37..513e81a461 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -29,23 +29,18 @@ struct STableObj; struct SRpcMsg; typedef struct { - int len; - void *rsp; + int32_t len; + void * rsp; } SMnodeRsp; typedef struct SMnodeMsg { SMnodeRsp rpcRsp; - uint8_t msgType; int8_t received; int8_t successed; int8_t expected; int8_t retry; - int8_t maxRetry; - int32_t contLen; int32_t code; - void * ahandle; - void * thandle; - void * pCont; + struct SRpcMsg rpcMsg; struct SAcctObj * pAcct; struct SDnodeObj *pDnode; struct SUserObj * pUser; diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 09fd7f5b3e..1220988221 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -750,8 +750,7 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) { } static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { - SCMCreateDbMsg *pCreate = pMsg->pCont; - + SCMCreateDbMsg *pCreate = pMsg->rpcMsg.pCont; pCreate->maxTables = htonl(pCreate->maxTables); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->totalBlocks = htonl(pCreate->totalBlocks); @@ -937,8 +936,8 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { } static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { - SCMAlterDbMsg *pAlter = pMsg->pCont; - mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle); + SCMAlterDbMsg *pAlter = pMsg->rpcMsg.pCont; + mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->rpcMsg.handle); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pAlter->db); if (pMsg->pDb == NULL) { @@ -974,8 +973,8 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { } static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { - SCMDropDbMsg *pDrop = pMsg->pCont; - mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle); + SCMDropDbMsg *pDrop = pMsg->rpcMsg.pCont; + mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->rpcMsg.handle); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pDrop->db); if (pMsg->pDb == NULL) { diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 5872081d67..8ce710110e 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "trpc.h" #include "tgrant.h" #include "tbalance.h" #include "tglobal.h" @@ -240,7 +241,7 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) { } static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { - SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont; + SCMCfgDnodeMsg *pCmCfgDnode = pMsg->rpcMsg.pCont; if (pCmCfgDnode->ep[0] == 0) { strcpy(pCmCfgDnode->ep, tsLocalEp); } else { @@ -275,7 +276,7 @@ static void mnodeProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { } static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { - SDMStatusMsg *pStatus = pMsg->pCont; + SDMStatusMsg *pStatus = pMsg->rpcMsg.pCont; pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->moduleStatus = htonl(pStatus->moduleStatus); pStatus->lastReboot = htonl(pStatus->lastReboot); @@ -442,7 +443,7 @@ static int32_t mnodeDropDnodeByEp(char *ep) { } static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) { - SCMCreateDnodeMsg *pCreate = pMsg->pCont; + SCMCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont; if (strcmp(pMsg->pUser->user, "root") != 0) { return TSDB_CODE_NO_RIGHTS; @@ -462,7 +463,7 @@ static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) { } static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg) { - SCMDropDnodeMsg *pDrop = pMsg->pCont; + SCMDropDnodeMsg *pDrop = pMsg->rpcMsg.pCont; if (strcmp(pMsg->pUser->user, "root") != 0) { return TSDB_CODE_NO_RIGHTS; diff --git a/src/mnode/src/mnodeInt.c b/src/mnode/src/mnodeInt.c index a701f1e1f4..1cb421bef7 100644 --- a/src/mnode/src/mnodeInt.c +++ b/src/mnode/src/mnodeInt.c @@ -35,14 +35,11 @@ #include "mnodeVgroup.h" void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg) { - pMsg->thandle = rpcMsg->handle; - pMsg->msgType = rpcMsg->msgType; - pMsg->contLen = rpcMsg->contLen; - pMsg->pCont = rpcMsg->pCont; + pMsg->rpcMsg = *rpcMsg; } int32_t mnodeInitMsg(SMnodeMsg *pMsg) { - pMsg->pUser = mnodeGetUserFromConn(pMsg->thandle); + pMsg->pUser = mnodeGetUserFromConn(pMsg->rpcMsg.handle); if (pMsg->pUser == NULL) { return TSDB_CODE_INVALID_USER; } @@ -52,7 +49,7 @@ int32_t mnodeInitMsg(SMnodeMsg *pMsg) { void mnodeCleanupMsg(SMnodeMsg *pMsg) { if (pMsg != NULL) { - if (pMsg->pCont) rpcFreeCont(pMsg->pCont); + if (pMsg->rpcMsg.pCont) rpcFreeCont(pMsg->rpcMsg.pCont); if (pMsg->pUser) mnodeDecUserRef(pMsg->pUser); if (pMsg->pDb) mnodeDecDbRef(pMsg->pDb); if (pMsg->pVgroup) mnodeDecVgroupRef(pMsg->pVgroup); diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 8acd12dce3..3594b60cf1 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -47,8 +47,8 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { } int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { - if (pMsg->pCont == NULL) { - mError("%p, msg:%s in mpeer queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (pMsg->rpcMsg.pCont == NULL) { + mError("%p, msg:%s in mpeer queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -59,7 +59,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -67,18 +67,18 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { return TSDB_CODE_REDIRECT; } - if (tsMnodeProcessPeerMsgFp[pMsg->msgType] == NULL) { - mError("%p, msg:%s in mpeer queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (tsMnodeProcessPeerMsgFp[pMsg->rpcMsg.msgType] == NULL) { + mError("%p, msg:%s in mpeer queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } - return (*tsMnodeProcessPeerMsgFp[pMsg->msgType])(pMsg); + return (*tsMnodeProcessPeerMsgFp[pMsg->rpcMsg.msgType])(pMsg); } void mnodeProcessPeerRsp(SRpcMsg *pMsg) { if (tsMnodeProcessPeerRspFp[pMsg->msgType]) { (*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg); } else { - mError("msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); + mError("%p, msg:%s is not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); } } diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index 172a27a52f..cc58f89041 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "trpc.h" #include "taosdef.h" #include "tsched.h" #include "tbalance.h" @@ -42,8 +43,8 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) { } int32_t mnodeProcessRead(SMnodeMsg *pMsg) { - if (pMsg->pCont == NULL) { - mError("%p, msg:%s in mread queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (pMsg->rpcMsg.pCont == NULL) { + mError("%p, msg:%s in mread queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -54,7 +55,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -62,16 +63,16 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { return TSDB_CODE_REDIRECT; } - if (tsMnodeProcessReadMsgFp[pMsg->msgType] == NULL) { - mError("%p, msg:%s in mread queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (tsMnodeProcessReadMsgFp[pMsg->rpcMsg.msgType] == NULL) { + mError("%p, msg:%s in mread queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } int32_t code = mnodeInitMsg(pMsg); if (code != TSDB_CODE_SUCCESS) { - mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code)); + mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code)); return code; } - return (*tsMnodeProcessReadMsgFp[pMsg->msgType])(pMsg); + return (*tsMnodeProcessReadMsgFp[pMsg->rpcMsg.msgType])(pMsg); } diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 2a42ad869e..c29c3dab68 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -106,7 +106,7 @@ static char *mnodeGetShowType(int32_t showType) { } static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { - SCMShowMsg *pShowMsg = pMsg->pCont; + SCMShowMsg *pShowMsg = pMsg->rpcMsg.pCont; if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { return TSDB_CODE_INVALID_MSG_TYPE; } @@ -137,7 +137,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { pShowRsp->qhandle = htobe64((uint64_t) pShow); mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type)); - int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle); + int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle); if (code == 0) { pMsg->rpcRsp.rsp = pShowRsp; pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns; @@ -153,7 +153,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { int32_t rowsToRead = 0; int32_t size = 0; int32_t rowsRead = 0; - SRetrieveTableMsg *pRetrieve = pMsg->pCont; + SRetrieveTableMsg *pRetrieve = pMsg->rpcMsg.pCont; pRetrieve->qhandle = htobe64(pRetrieve->qhandle); /* @@ -187,7 +187,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { // if free flag is set, client wants to clean the resources if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) - rowsRead = (*tsMnodeShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle); + rowsRead = (*tsMnodeShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->rpcMsg.handle); if (rowsRead < 0) { rpcFreeCont(pRsp); @@ -236,12 +236,12 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { } static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { - SCMConnectMsg *pConnectMsg = pMsg->pCont; + SCMConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont; int32_t code = TSDB_CODE_SUCCESS; SRpcConnInfo connInfo; - if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) { - mError("thandle:%p is already released while process connect msg", pMsg->thandle); + if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) != 0) { + mError("thandle:%p is already released while process connect msg", pMsg->rpcMsg.handle); code = TSDB_CODE_INVALID_MSG_CONTENT; goto connect_over; } @@ -291,7 +291,7 @@ connect_over: } static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) { - SCMUseDbMsg *pUseDbMsg = pMsg->pCont; + SCMUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont; int32_t code = TSDB_CODE_SUCCESS; if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pUseDbMsg->db); diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 9399c29be8..7eb6320763 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -660,7 +660,7 @@ static void mgmtExtractTableName(char* tableId, char* name) { } static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { - SCMCreateTableMsg *pCreate = pMsg->pCont; + SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreate->db); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { @@ -683,16 +683,16 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) { } if (pCreate->numOfTags != 0) { - mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle); + mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateSuperTableMsg(pMsg); } else { - mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle); + mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle); return mnodeProcessCreateChildTableMsg(pMsg); } } static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { - SCMDropTableMsg *pDrop = pMsg->pCont; + SCMDropTableMsg *pDrop = pMsg->rpcMsg.pCont; if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pDrop->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { mError("table:%s, failed to drop table, db not selected", pDrop->tableId); @@ -725,9 +725,9 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) { } static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { - SCMTableInfoMsg *pInfo = pMsg->pCont; + SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; pInfo->createFlag = htons(pInfo->createFlag); - mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->thandle, pInfo->createFlag); + mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->rpcMsg.handle, pInfo->createFlag); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pInfo->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { @@ -753,7 +753,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { } static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { - SCMCreateTableMsg *pCreate = pMsg->pCont; + SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) { mError("table:%s, failed to create, no enough memory", pCreate->tableId); @@ -1257,7 +1257,7 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { } static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { - SCMSTableVgroupMsg *pInfo = pMsg->pCont; + SCMSTableVgroupMsg *pInfo = pMsg->rpcMsg.pCont; int32_t numOfTable = htonl(pInfo->numOfTables); // reserve space @@ -1479,7 +1479,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb } static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { - SCMCreateTableMsg *pCreate = pMsg->pCont; + SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); if (code != TSDB_CODE_SUCCESS) { mError("table:%s, failed to create, grant timeseries failed", pCreate->tableId); @@ -1521,8 +1521,6 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { } SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup); - pMsg->ahandle = pMsg->pTable; - pMsg->maxRetry = 10; SRpcMsg rpcMsg = { .handle = pMsg, .pCont = pMDCreate, @@ -1559,7 +1557,6 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup); mPrint("table:%s, send drop ctable msg", pDrop->tableId); - pMsg->ahandle = pMsg->pTable; SRpcMsg rpcMsg = { .handle = pMsg, .pCont = pDrop, @@ -1726,7 +1723,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { } static int32_t mgmtAutoCreateChildTable(SMnodeMsg *pMsg) { - SCMTableInfoMsg *pInfo = pMsg->pCont; + SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; STagData* pTag = (STagData*)pInfo->tags; int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen); @@ -1744,10 +1741,10 @@ static int32_t mgmtAutoCreateChildTable(SMnodeMsg *pMsg) { memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg)); - pMsg->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; - rpcFreeCont(pMsg->pCont); - pMsg->pCont = pCreateMsg; - pMsg->contLen = contLen; + rpcFreeCont(pMsg->rpcMsg.pCont); + pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; + pMsg->rpcMsg.pCont = pCreateMsg; + pMsg->rpcMsg.contLen = contLen; mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, pInfo->tags); @@ -1868,7 +1865,7 @@ static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t sid) { } static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) { - SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pMsg->pCont; + SDMConfigTableMsg *pCfg = pMsg->rpcMsg.pCont; pCfg->dnode = htonl(pCfg->dnode); pCfg->vnode = htonl(pCfg->vnode); pCfg->sid = htonl(pCfg->sid); @@ -1911,8 +1908,9 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) { SMnodeMsg *mnodeMsg = rpcMsg->handle; mnodeMsg->received++; - SChildTableObj *pTable = mnodeMsg->ahandle; - mPrint("table:%s, drop table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->thandle, tstrerror(rpcMsg->code)); + SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; + assert(pTable); + mPrint("table:%s, drop table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { mError("table:%s, failed to drop in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); @@ -1957,18 +1955,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { SMnodeMsg *mnodeMsg = rpcMsg->handle; mnodeMsg->received++; - SChildTableObj *pTable = mnodeMsg->ahandle; - mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->thandle, + SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable; + assert(pTable); + mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { - if (mnodeMsg->retry++ < mnodeMsg->maxRetry) { + if (mnodeMsg->retry++ < 10) { mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId, - mnodeMsg->retry, mnodeMsg->thandle, tstrerror(rpcMsg->code)); + mnodeMsg->retry, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); dnodeDelayReprocessMnodeWriteMsg(mnodeMsg); } else { mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId, - mnodeMsg->thandle, tstrerror(rpcMsg->code)); + mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -1980,9 +1979,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code); } } else { - mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->thandle, + mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); - SCMCreateTableMsg *pCreate = mnodeMsg->pCont; + SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont; if (pCreate->getMeta) { mTrace("table:%s, continue to get meta", pTable->info.tableId); mnodeMsg->retry = 0; @@ -1999,7 +1998,7 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { } static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { - SCMMultiTableInfoMsg *pInfo = pMsg->pCont; + SCMMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); int32_t totalMallocLen = 4 * 1024 * 1024; // first malloc 4 MB, subsequent reallocation as twice @@ -2183,8 +2182,8 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows } static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { - SCMAlterTableMsg *pAlter = pMsg->pCont; - mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); + SCMAlterTableMsg *pAlter = pMsg->rpcMsg.pCont; + mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->rpcMsg.handle); if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pAlter->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index ad6370302a..a1edaaa0a8 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -352,7 +352,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) { SUserObj *pOperUser = pMsg->pUser; if (pOperUser->superAuth) { - SCMCreateUserMsg *pCreate = pMsg->pCont; + SCMCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass); if (code == TSDB_CODE_SUCCESS) { mLPrint("user:%s, is created by %s", pCreate->user, pOperUser->user); @@ -369,7 +369,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) { int32_t code; SUserObj *pOperUser = pMsg->pUser; - SCMAlterUserMsg *pAlter = pMsg->pCont; + SCMAlterUserMsg *pAlter = pMsg->rpcMsg.pCont; SUserObj *pUser = mnodeGetUser(pAlter->user); if (pUser == NULL) { return TSDB_CODE_INVALID_USER; @@ -459,7 +459,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) { int32_t code; SUserObj *pOperUser = pMsg->pUser; - SCMDropUserMsg *pDrop = pMsg->pCont; + SCMDropUserMsg *pDrop = pMsg->rpcMsg.pCont; SUserObj *pUser = mnodeGetUser(pDrop->user); if (pUser == NULL) { return TSDB_CODE_INVALID_USER; @@ -552,7 +552,7 @@ int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, cha } static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg) { - SDMAuthMsg *pAuthMsg = pMsg->pCont; + SDMAuthMsg *pAuthMsg = pMsg->rpcMsg.pCont; SDMAuthRsp *pAuthRsp = rpcMallocCont(sizeof(SDMAuthRsp)); pMsg->rpcRsp.rsp = pAuthRsp; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 69baa09c95..0d235f898d 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -331,7 +331,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) { mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); } - pMsg->ahandle = pVgroup; + pMsg->pVgroup = pVgroup; pMsg->expected = pVgroup->numOfVnodes; mnodeSendCreateVgroupMsg(pVgroup, pMsg); @@ -626,10 +626,10 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { mnodeMsg->successed++; } - SVgObj *pVgroup = mnodeMsg->ahandle; + SVgObj *pVgroup = mnodeMsg->pVgroup; mTrace("vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected, - mnodeMsg->thandle, rpcMsg->handle); + mnodeMsg->rpcMsg.handle, rpcMsg->handle); if (mnodeMsg->received != mnodeMsg->expected) return; @@ -690,10 +690,10 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) { mnodeMsg->successed++; } - SVgObj *pVgroup = mnodeMsg->ahandle; + SVgObj *pVgroup = mnodeMsg->pVgroup; mTrace("vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected, - mnodeMsg->thandle, rpcMsg->handle); + mnodeMsg->rpcMsg.handle, rpcMsg->handle); if (mnodeMsg->received != mnodeMsg->expected) return; @@ -711,7 +711,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) { } static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) { - SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pMsg->pCont; + SDMConfigVnodeMsg *pCfg = pMsg->rpcMsg.pCont; pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->vgId = htonl(pCfg->vgId); diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 1741d04fc6..8b3d82d32a 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -20,6 +20,7 @@ #include "tbalance.h" #include "tgrant.h" #include "tglobal.h" +#include "trpc.h" #include "mnode.h" #include "dnode.h" #include "mnodeDef.h" @@ -41,8 +42,8 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg)) } int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { - if (pMsg->pCont == NULL) { - mError("%p, msg:%s in mwrite queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (pMsg->rpcMsg.pCont == NULL) { + mError("%p, msg:%s in mwrite queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_INVALID_MSG_LEN; } @@ -53,7 +54,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { rpcRsp->rsp = ipSet; rpcRsp->len = sizeof(SRpcIpSet); - mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse); + mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse); for (int32_t i = 0; i < ipSet->numOfIps; ++i) { mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i])); } @@ -61,21 +62,21 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { return TSDB_CODE_REDIRECT; } - if (tsMnodeProcessWriteMsgFp[pMsg->msgType] == NULL) { - mError("%p, msg:%s in mwrite queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]); + if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) { + mError("%p, msg:%s in mwrite queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_MSG_NOT_PROCESSED; } int32_t code = mnodeInitMsg(pMsg); if (code != TSDB_CODE_SUCCESS) { - mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code)); + mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code)); return code; } if (!pMsg->pUser->writeAuth) { - mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->ahandle, taosMsg[pMsg->msgType]); + mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]); return TSDB_CODE_NO_RIGHTS; } - return (*tsMnodeProcessWriteMsgFp[pMsg->msgType])(pMsg); + return (*tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType])(pMsg); } -- GitLab