提交 5c889fff 编写于 作者: S Shengliang Guan

[TD-335] refact SMnodeMsg

上级 5a601c83
......@@ -29,23 +29,18 @@ struct STableObj;
struct SRpcMsg;
typedef struct {
int len;
void *rsp;
int32_t len;
void * rsp;
} SMnodeRsp;
typedef struct SMnodeMsg {
SMnodeRsp rpcRsp;
uint8_t msgType;
int8_t received;
int8_t successed;
int8_t expected;
int8_t retry;
int8_t maxRetry;
int32_t contLen;
int32_t code;
void * ahandle;
void * thandle;
void * pCont;
struct SRpcMsg rpcMsg;
struct SAcctObj * pAcct;
struct SDnodeObj *pDnode;
struct SUserObj * pUser;
......
......@@ -750,8 +750,7 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
}
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
SCMCreateDbMsg *pCreate = pMsg->pCont;
SCMCreateDbMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->maxTables = htonl(pCreate->maxTables);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks);
......@@ -937,8 +936,8 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
}
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
SCMAlterDbMsg *pAlter = pMsg->pCont;
mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle);
SCMAlterDbMsg *pAlter = pMsg->rpcMsg.pCont;
mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pAlter->db);
if (pMsg->pDb == NULL) {
......@@ -974,8 +973,8 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
SCMDropDbMsg *pDrop = pMsg->pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle);
SCMDropDbMsg *pDrop = pMsg->rpcMsg.pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pDrop->db);
if (pMsg->pDb == NULL) {
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "trpc.h"
#include "tgrant.h"
#include "tbalance.h"
#include "tglobal.h"
......@@ -240,7 +241,7 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) {
}
static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont;
SCMCfgDnodeMsg *pCmCfgDnode = pMsg->rpcMsg.pCont;
if (pCmCfgDnode->ep[0] == 0) {
strcpy(pCmCfgDnode->ep, tsLocalEp);
} else {
......@@ -275,7 +276,7 @@ static void mnodeProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) {
}
static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
SDMStatusMsg *pStatus = pMsg->pCont;
SDMStatusMsg *pStatus = pMsg->rpcMsg.pCont;
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->moduleStatus = htonl(pStatus->moduleStatus);
pStatus->lastReboot = htonl(pStatus->lastReboot);
......@@ -442,7 +443,7 @@ static int32_t mnodeDropDnodeByEp(char *ep) {
}
static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
SCMCreateDnodeMsg *pCreate = pMsg->pCont;
SCMCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont;
if (strcmp(pMsg->pUser->user, "root") != 0) {
return TSDB_CODE_NO_RIGHTS;
......@@ -462,7 +463,7 @@ static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg) {
SCMDropDnodeMsg *pDrop = pMsg->pCont;
SCMDropDnodeMsg *pDrop = pMsg->rpcMsg.pCont;
if (strcmp(pMsg->pUser->user, "root") != 0) {
return TSDB_CODE_NO_RIGHTS;
......
......@@ -35,14 +35,11 @@
#include "mnodeVgroup.h"
void mnodeCreateMsg(SMnodeMsg *pMsg, SRpcMsg *rpcMsg) {
pMsg->thandle = rpcMsg->handle;
pMsg->msgType = rpcMsg->msgType;
pMsg->contLen = rpcMsg->contLen;
pMsg->pCont = rpcMsg->pCont;
pMsg->rpcMsg = *rpcMsg;
}
int32_t mnodeInitMsg(SMnodeMsg *pMsg) {
pMsg->pUser = mnodeGetUserFromConn(pMsg->thandle);
pMsg->pUser = mnodeGetUserFromConn(pMsg->rpcMsg.handle);
if (pMsg->pUser == NULL) {
return TSDB_CODE_INVALID_USER;
}
......@@ -52,7 +49,7 @@ int32_t mnodeInitMsg(SMnodeMsg *pMsg) {
void mnodeCleanupMsg(SMnodeMsg *pMsg) {
if (pMsg != NULL) {
if (pMsg->pCont) rpcFreeCont(pMsg->pCont);
if (pMsg->rpcMsg.pCont) rpcFreeCont(pMsg->rpcMsg.pCont);
if (pMsg->pUser) mnodeDecUserRef(pMsg->pUser);
if (pMsg->pDb) mnodeDecDbRef(pMsg->pDb);
if (pMsg->pVgroup) mnodeDecVgroupRef(pMsg->pVgroup);
......
......@@ -47,8 +47,8 @@ void mnodeAddPeerRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
}
int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
if (pMsg->pCont == NULL) {
mError("%p, msg:%s in mpeer queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]);
if (pMsg->rpcMsg.pCont == NULL) {
mError("%p, msg:%s in mpeer queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_INVALID_MSG_LEN;
}
......@@ -59,7 +59,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet);
mTrace("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse);
mTrace("%p, msg:%s in mpeer queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
}
......@@ -67,18 +67,18 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
return TSDB_CODE_REDIRECT;
}
if (tsMnodeProcessPeerMsgFp[pMsg->msgType] == NULL) {
mError("%p, msg:%s in mpeer queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
if (tsMnodeProcessPeerMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("%p, msg:%s in mpeer queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED;
}
return (*tsMnodeProcessPeerMsgFp[pMsg->msgType])(pMsg);
return (*tsMnodeProcessPeerMsgFp[pMsg->rpcMsg.msgType])(pMsg);
}
void mnodeProcessPeerRsp(SRpcMsg *pMsg) {
if (tsMnodeProcessPeerRspFp[pMsg->msgType]) {
(*tsMnodeProcessPeerRspFp[pMsg->msgType])(pMsg);
} else {
mError("msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
mError("%p, msg:%s is not processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
}
}
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "trpc.h"
#include "taosdef.h"
#include "tsched.h"
#include "tbalance.h"
......@@ -42,8 +43,8 @@ void mnodeAddReadMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *pMsg)) {
}
int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
if (pMsg->pCont == NULL) {
mError("%p, msg:%s in mread queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]);
if (pMsg->rpcMsg.pCont == NULL) {
mError("%p, msg:%s in mread queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_INVALID_MSG_LEN;
}
......@@ -54,7 +55,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet);
mTrace("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse);
mTrace("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
}
......@@ -62,16 +63,16 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
return TSDB_CODE_REDIRECT;
}
if (tsMnodeProcessReadMsgFp[pMsg->msgType] == NULL) {
mError("%p, msg:%s in mread queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
if (tsMnodeProcessReadMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("%p, msg:%s in mread queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED;
}
int32_t code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) {
mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code));
mError("%p, msg:%s in mread queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code));
return code;
}
return (*tsMnodeProcessReadMsgFp[pMsg->msgType])(pMsg);
return (*tsMnodeProcessReadMsgFp[pMsg->rpcMsg.msgType])(pMsg);
}
......@@ -106,7 +106,7 @@ static char *mnodeGetShowType(int32_t showType) {
}
static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
SCMShowMsg *pShowMsg = pMsg->pCont;
SCMShowMsg *pShowMsg = pMsg->rpcMsg.pCont;
if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) {
return TSDB_CODE_INVALID_MSG_TYPE;
}
......@@ -137,7 +137,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
pShowRsp->qhandle = htobe64((uint64_t) pShow);
mTrace("show:%p, type:%s, start to get meta", pShow, mnodeGetShowType(pShowMsg->type));
int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->thandle);
int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle);
if (code == 0) {
pMsg->rpcRsp.rsp = pShowRsp;
pMsg->rpcRsp.len = sizeof(SCMShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
......@@ -153,7 +153,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
int32_t rowsToRead = 0;
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
SRetrieveTableMsg *pRetrieve = pMsg->rpcMsg.pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
/*
......@@ -187,7 +187,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*tsMnodeShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->thandle);
rowsRead = (*tsMnodeShowRetrieveFp[pShow->type])(pShow, pRsp->data, rowsToRead, pMsg->rpcMsg.handle);
if (rowsRead < 0) {
rpcFreeCont(pRsp);
......@@ -236,12 +236,12 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
SCMConnectMsg *pConnectMsg = pMsg->pCont;
SCMConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont;
int32_t code = TSDB_CODE_SUCCESS;
SRpcConnInfo connInfo;
if (rpcGetConnInfo(pMsg->thandle, &connInfo) != 0) {
mError("thandle:%p is already released while process connect msg", pMsg->thandle);
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) != 0) {
mError("thandle:%p is already released while process connect msg", pMsg->rpcMsg.handle);
code = TSDB_CODE_INVALID_MSG_CONTENT;
goto connect_over;
}
......@@ -291,7 +291,7 @@ connect_over:
}
static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
SCMUseDbMsg *pUseDbMsg = pMsg->pCont;
SCMUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont;
int32_t code = TSDB_CODE_SUCCESS;
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pUseDbMsg->db);
......
......@@ -660,7 +660,7 @@ static void mgmtExtractTableName(char* tableId, char* name) {
}
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreate->db);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
......@@ -683,16 +683,16 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
}
if (pCreate->numOfTags != 0) {
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle);
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateSuperTableMsg(pMsg);
} else {
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle);
mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
return mnodeProcessCreateChildTableMsg(pMsg);
}
}
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
SCMDropTableMsg *pDrop = pMsg->pCont;
SCMDropTableMsg *pDrop = pMsg->rpcMsg.pCont;
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pDrop->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("table:%s, failed to drop table, db not selected", pDrop->tableId);
......@@ -725,9 +725,9 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont;
SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
pInfo->createFlag = htons(pInfo->createFlag);
mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->thandle, pInfo->createFlag);
mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->rpcMsg.handle, pInfo->createFlag);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pInfo->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
......@@ -753,7 +753,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj));
if (pStable == NULL) {
mError("table:%s, failed to create, no enough memory", pCreate->tableId);
......@@ -1257,7 +1257,7 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
SCMSTableVgroupMsg *pInfo = pMsg->rpcMsg.pCont;
int32_t numOfTable = htonl(pInfo->numOfTables);
// reserve space
......@@ -1479,7 +1479,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb
}
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create, grant timeseries failed", pCreate->tableId);
......@@ -1521,8 +1521,6 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
}
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup);
pMsg->ahandle = pMsg->pTable;
pMsg->maxRetry = 10;
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pMDCreate,
......@@ -1559,7 +1557,6 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
mPrint("table:%s, send drop ctable msg", pDrop->tableId);
pMsg->ahandle = pMsg->pTable;
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pDrop,
......@@ -1726,7 +1723,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
}
static int32_t mgmtAutoCreateChildTable(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->pCont;
SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STagData* pTag = (STagData*)pInfo->tags;
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen);
......@@ -1744,10 +1741,10 @@ static int32_t mgmtAutoCreateChildTable(SMnodeMsg *pMsg) {
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg));
pMsg->msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
rpcFreeCont(pMsg->pCont);
pMsg->pCont = pCreateMsg;
pMsg->contLen = contLen;
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
pMsg->rpcMsg.pCont = pCreateMsg;
pMsg->rpcMsg.contLen = contLen;
mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, pInfo->tags);
......@@ -1868,7 +1865,7 @@ static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t sid) {
}
static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
SDMConfigTableMsg *pCfg = (SDMConfigTableMsg *) pMsg->pCont;
SDMConfigTableMsg *pCfg = pMsg->rpcMsg.pCont;
pCfg->dnode = htonl(pCfg->dnode);
pCfg->vnode = htonl(pCfg->vnode);
pCfg->sid = htonl(pCfg->sid);
......@@ -1911,8 +1908,9 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
SMnodeMsg *mnodeMsg = rpcMsg->handle;
mnodeMsg->received++;
SChildTableObj *pTable = mnodeMsg->ahandle;
mPrint("table:%s, drop table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->thandle, tstrerror(rpcMsg->code));
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable);
mPrint("table:%s, drop table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to drop in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code));
......@@ -1957,18 +1955,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
SMnodeMsg *mnodeMsg = rpcMsg->handle;
mnodeMsg->received++;
SChildTableObj *pTable = mnodeMsg->ahandle;
mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->thandle,
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable);
mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle,
tstrerror(rpcMsg->code));
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
if (mnodeMsg->retry++ < mnodeMsg->maxRetry) {
if (mnodeMsg->retry++ < 10) {
mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId,
mnodeMsg->retry, mnodeMsg->thandle, tstrerror(rpcMsg->code));
mnodeMsg->retry, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
} else {
mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId,
mnodeMsg->thandle, tstrerror(rpcMsg->code));
mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
......@@ -1980,9 +1979,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
}
} else {
mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->thandle,
mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle,
tstrerror(rpcMsg->code));
SCMCreateTableMsg *pCreate = mnodeMsg->pCont;
SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont;
if (pCreate->getMeta) {
mTrace("table:%s, continue to get meta", pTable->info.tableId);
mnodeMsg->retry = 0;
......@@ -1999,7 +1998,7 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
}
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
SCMMultiTableInfoMsg *pInfo = pMsg->pCont;
SCMMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables);
int32_t totalMallocLen = 4 * 1024 * 1024; // first malloc 4 MB, subsequent reallocation as twice
......@@ -2183,8 +2182,8 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
}
static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
SCMAlterTableMsg *pAlter = pMsg->pCont;
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle);
SCMAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pAlter->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
......
......@@ -352,7 +352,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) {
SUserObj *pOperUser = pMsg->pUser;
if (pOperUser->superAuth) {
SCMCreateUserMsg *pCreate = pMsg->pCont;
SCMCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s, is created by %s", pCreate->user, pOperUser->user);
......@@ -369,7 +369,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) {
int32_t code;
SUserObj *pOperUser = pMsg->pUser;
SCMAlterUserMsg *pAlter = pMsg->pCont;
SCMAlterUserMsg *pAlter = pMsg->rpcMsg.pCont;
SUserObj *pUser = mnodeGetUser(pAlter->user);
if (pUser == NULL) {
return TSDB_CODE_INVALID_USER;
......@@ -459,7 +459,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) {
int32_t code;
SUserObj *pOperUser = pMsg->pUser;
SCMDropUserMsg *pDrop = pMsg->pCont;
SCMDropUserMsg *pDrop = pMsg->rpcMsg.pCont;
SUserObj *pUser = mnodeGetUser(pDrop->user);
if (pUser == NULL) {
return TSDB_CODE_INVALID_USER;
......@@ -552,7 +552,7 @@ int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, cha
}
static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg) {
SDMAuthMsg *pAuthMsg = pMsg->pCont;
SDMAuthMsg *pAuthMsg = pMsg->rpcMsg.pCont;
SDMAuthRsp *pAuthRsp = rpcMallocCont(sizeof(SDMAuthRsp));
pMsg->rpcRsp.rsp = pAuthRsp;
......
......@@ -331,7 +331,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
}
pMsg->ahandle = pVgroup;
pMsg->pVgroup = pVgroup;
pMsg->expected = pVgroup->numOfVnodes;
mnodeSendCreateVgroupMsg(pVgroup, pMsg);
......@@ -626,10 +626,10 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
mnodeMsg->successed++;
}
SVgObj *pVgroup = mnodeMsg->ahandle;
SVgObj *pVgroup = mnodeMsg->pVgroup;
mTrace("vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected,
mnodeMsg->thandle, rpcMsg->handle);
mnodeMsg->rpcMsg.handle, rpcMsg->handle);
if (mnodeMsg->received != mnodeMsg->expected) return;
......@@ -690,10 +690,10 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
mnodeMsg->successed++;
}
SVgObj *pVgroup = mnodeMsg->ahandle;
SVgObj *pVgroup = mnodeMsg->pVgroup;
mTrace("vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected,
mnodeMsg->thandle, rpcMsg->handle);
mnodeMsg->rpcMsg.handle, rpcMsg->handle);
if (mnodeMsg->received != mnodeMsg->expected) return;
......@@ -711,7 +711,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
}
static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) {
SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) pMsg->pCont;
SDMConfigVnodeMsg *pCfg = pMsg->rpcMsg.pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->vgId = htonl(pCfg->vgId);
......
......@@ -20,6 +20,7 @@
#include "tbalance.h"
#include "tgrant.h"
#include "tglobal.h"
#include "trpc.h"
#include "mnode.h"
#include "dnode.h"
#include "mnodeDef.h"
......@@ -41,8 +42,8 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg))
}
int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
if (pMsg->pCont == NULL) {
mError("%p, msg:%s in mwrite queue, content is null", pMsg->ahandle, taosMsg[pMsg->msgType]);
if (pMsg->rpcMsg.pCont == NULL) {
mError("%p, msg:%s in mwrite queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_INVALID_MSG_LEN;
}
......@@ -53,7 +54,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = ipSet;
rpcRsp->len = sizeof(SRpcIpSet);
mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->ahandle, taosMsg[pMsg->msgType], ipSet->inUse);
mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse);
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
}
......@@ -61,21 +62,21 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
return TSDB_CODE_REDIRECT;
}
if (tsMnodeProcessWriteMsgFp[pMsg->msgType] == NULL) {
mError("%p, msg:%s in mwrite queue, not processed", pMsg->ahandle, taosMsg[pMsg->msgType]);
if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) {
mError("%p, msg:%s in mwrite queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_MSG_NOT_PROCESSED;
}
int32_t code = mnodeInitMsg(pMsg);
if (code != TSDB_CODE_SUCCESS) {
mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->ahandle, taosMsg[pMsg->msgType], tstrerror(code));
mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code));
return code;
}
if (!pMsg->pUser->writeAuth) {
mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->ahandle, taosMsg[pMsg->msgType]);
mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
return TSDB_CODE_NO_RIGHTS;
}
return (*tsMnodeProcessWriteMsgFp[pMsg->msgType])(pMsg);
return (*tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType])(pMsg);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册