From bfbd2707212626705ce7ab2b884d219edc37b26e Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 8 Mar 2020 21:30:09 +0800 Subject: [PATCH] fix compile error in mnode while rpc interface changed --- src/mnode/inc/mgmtShell.h | 2 - src/mnode/src/mgmtDnodeInt.c | 18 +- src/mnode/src/mgmtShell.c | 602 +++++++++++++++++++++-------------- src/mnode/src/mgmtTable.c | 2 +- 4 files changed, 375 insertions(+), 249 deletions(-) diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index 06b0068652..d1d4433683 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -28,8 +28,6 @@ int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); -extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); /* * If table not exist, will create it diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 100e76b10b..4979f4bb37 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -157,13 +157,19 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con } if (code != TSDB_CODE_SUCCESS) { - rpcSendResponse(info->thandle, code, NULL, 0); + SRpcMsg rpcMsg = {0}; + rpcMsg.code = code; + rpcMsg.handle = info->thandle; + rpcSendResponse(&rpcMsg); } else { if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) { mTrace("table:%s, start to process get meta", pTable->tableId); mgmtProcessGetTableMeta(pTable, thandle); } else { - rpcSendResponse(info->thandle, code, NULL, 0); + SRpcMsg rpcMsg = {0}; + rpcMsg.code = code; + rpcMsg.handle = info->thandle; + rpcSendResponse(&rpcMsg); } } @@ -236,7 +242,11 @@ static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) { mgmtUpdateGrantInfoFp(pCont); mTrace("grant info is updated"); } - rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.code = TSDB_CODE_SUCCESS; + rpcMsg.handle = thandle; + rpcSendResponse(&rpcMsg); } void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { @@ -368,7 +378,7 @@ int32_t mgmtSendCfgDnodeMsg(char *cont) { //#else // (void)tsCfgDynamicOptions(pCfg->config); //#endif -// return 0; + return 0; } int32_t mgmtInitDnodeInt() { diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index a889c16f75..6567872861 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -46,34 +46,36 @@ static RetrieveMetaFp mgmtRetrieveFp[TSDB_MGMT_TABLE_MAX] = {0}; static void mgmtInitShowMsgFp(); static void mgmtInitProcessShellMsg(); -static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code); -static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle); -static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); +static void mgmtProcessMsgFromShell(SRpcMsg *msg); +static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(SRpcMsg *msg); +static void mgmtProcessUnSupportMsg(SRpcMsg *msg); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); void *tsShellConnServer = NULL; void mgmtProcessTranRequest(SSchedMsg *sched) { - int8_t msgType = *(int8_t *) (sched->msg); - int32_t contLen = *(int32_t *) (sched->msg + sizeof(int8_t)); - int8_t *pCont = sched->msg + sizeof(int32_t) + sizeof(int8_t); - void *pConn = sched->thandle; - - (*mgmtProcessShellMsg[msgType])(pCont, contLen, pConn); + SRpcMsg rpcMsg; + rpcMsg.msgType = *(int8_t *) (sched->msg); + rpcMsg.contLen = *(int32_t *) (sched->msg + sizeof(int8_t)); + rpcMsg.pCont = sched->msg + sizeof(int32_t) + sizeof(int8_t); + rpcMsg.handle = sched->thandle; + rpcMsg.code = TSDB_CODE_SUCCESS; + + (*mgmtProcessShellMsg[rpcMsg.msgType])(&rpcMsg); if (sched->msg) { free(sched->msg); } } -void mgmtAddToTranRequest(int8_t type, void *pCont, int contLen, void *ahandle) { +void mgmtAddToTranRequest(SRpcMsg *rpcMsg) { SSchedMsg schedMsg; - schedMsg.msg = malloc(contLen + sizeof(int32_t) + sizeof(int8_t)); + schedMsg.msg = malloc(rpcMsg->contLen + sizeof(int32_t) + sizeof(int8_t)); schedMsg.fp = mgmtProcessTranRequest; schedMsg.tfp = NULL; - schedMsg.thandle = ahandle; - *(int8_t *) (schedMsg.msg) = type; - *(int32_t *) (schedMsg.msg + sizeof(int8_t)) = contLen; - memcpy(schedMsg.msg + sizeof(int32_t) + sizeof(int8_t), pCont, contLen); + schedMsg.thandle = rpcMsg->handle; + *(int8_t *) (schedMsg.msg) = rpcMsg->msgType; + *(int32_t *) (schedMsg.msg + sizeof(int8_t)) = rpcMsg->contLen; + memcpy(schedMsg.msg + sizeof(int32_t) + sizeof(int8_t), rpcMsg->pCont, rpcMsg->contLen); taosScheduleTask(tsMgmtTranQhandle, &schedMsg); } @@ -115,14 +117,20 @@ void mgmtCleanUpShell() { } } -void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { - STableInfoMsg *pInfo = pCont; +void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp; + rpcRsp.handle = rpcMsg->handle; + rpcRsp.pCont = NULL; + rpcRsp.contLen = 0; + + STableInfoMsg *pInfo = rpcMsg->pCont; pInfo->createFlag = htons(pInfo->createFlag); - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("table:%s, failed to get table meta, invalid user", pInfo->tableId); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } @@ -130,19 +138,22 @@ void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { if (pTable == NULL) { if (pInfo->createFlag != 1) { mError("table:%s, failed to get table meta, table not exist", pInfo->tableId); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_TABLE; + rpcSendResponse(&rpcRsp); return; } else { // on demand create table from super table if table does not exists - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("table:%s, failed to create table while get meta info, need redirect message", pInfo->tableId); return; } - SCreateTableMsg *pCreateMsg = rpcMallocCont(sizeof(SCreateTableMsg) + sizeof(STagData)); + int32_t contLen = sizeof(SCreateTableMsg) + sizeof(STagData); + SCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); if (pCreateMsg == NULL) { mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId); - rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; + rpcSendResponse(&rpcRsp); return; } @@ -150,31 +161,38 @@ void mgmtProcessTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { strcpy(pCreateMsg->tableId, pInfo->tableId); mError("table:%s, start to create table while get meta info", pInfo->tableId); - mgmtCreateTable(pCreateMsg, contLen, ahandle, true); + mgmtCreateTable(pCreateMsg, contLen, rpcMsg->handle, true); } } else { - mgmtProcessGetTableMeta(pTable, ahandle); + mgmtProcessGetTableMeta(pTable, rpcMsg->handle); } } -void mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp; + rpcRsp.handle = rpcMsg->handle; + rpcRsp.pCont = NULL; + rpcRsp.contLen = 0; + SRpcConnInfo connInfo; - rpcGetConnInfo(ahandle, &connInfo); + rpcGetConnInfo(rpcMsg->handle, &connInfo); bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); SUserObj *pUser = mgmtGetUser(connInfo.user); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SMultiTableInfoMsg *pInfo = pCont; + SMultiTableInfoMsg *pInfo = rpcMsg->pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); if (pMultiMeta == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; + rpcSendResponse(&rpcRsp); return; } @@ -211,43 +229,48 @@ void mgmtProcessMultiTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { } } - rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMultiMeta, pMultiMeta->contLen); + rpcRsp.pCont = pMultiMeta; + rpcRsp.contLen = pMultiMeta->contLen; + rpcSendResponse(&rpcRsp); } -void mgmtProcessSuperTableMetaMsg(void *pCont, int32_t contLen, void *ahandle) { - SRpcConnInfo connInfo; - rpcGetConnInfo(ahandle, &connInfo); - -// bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); - - SSuperTableInfoMsg *pInfo = pCont; +void mgmtProcessSuperTableMetaMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SSuperTableInfoMsg *pInfo = rpcMsg->pCont; STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); if (pTable == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_TABLE; + rpcSendResponse(&rpcRsp); return; } SSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); if (pRsp != NULL) { int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); - rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pRsp, msgLen); + rpcRsp.pCont = pRsp; + rpcRsp.contLen = msgLen; + rpcSendResponse(&rpcRsp); } else { - rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_TABLE; + rpcSendResponse(&rpcRsp); } } -void mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SCreateDbMsg *pCreate = (SCreateDbMsg *) pCont; + SCreateDbMsg *pCreate = (SCreateDbMsg *) rpcMsg->pCont; pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); @@ -272,21 +295,24 @@ void mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) { } } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -void mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessAlterDbMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SAlterDbMsg *pAlter = (SAlterDbMsg *) pCont; + SAlterDbMsg *pAlter = (SAlterDbMsg *) rpcMsg->pCont; pAlter->daysPerFile = htonl(pAlter->daysPerFile); pAlter->daysToKeep = htonl(pAlter->daysToKeep); pAlter->maxSessions = htonl(pAlter->maxSessions) + 1; @@ -301,92 +327,104 @@ void mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) { } } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -void mgmtProcessKillQueryMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessKillQueryMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SKillQueryMsg *pKill = (SKillQueryMsg *) pCont; + SKillQueryMsg *pKill = (SKillQueryMsg *) rpcMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillQuery(pKill->queryId, ahandle); + code = mgmtKillQuery(pKill->queryId, rpcMsg->handle); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -void mgmtProcessKillStreamMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessKillStreamMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SKillStreamMsg *pKill = (SKillStreamMsg *) pCont; + SKillStreamMsg *pKill = (SKillStreamMsg *) rpcMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillStream(pKill->queryId, ahandle); + code = mgmtKillStream(pKill->queryId, rpcMsg->handle); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -void mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessKillConnectionMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SKillConnectionMsg *pKill = (SKillConnectionMsg *) pCont; + SKillConnectionMsg *pKill = (SKillConnectionMsg *) rpcMsg->pCont; int32_t code; if (!pUser->writeAuth) { code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtKillConnection(pKill->queryId, ahandle); + code = mgmtKillConnection(pKill->queryId, rpcMsg->handle); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -void mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessCreateUserMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } int32_t code; if (pUser->superAuth) { - SCreateUserMsg *pCreate = pCont; + SCreateUserMsg *pCreate = rpcMsg->pCont; code = mgmtCreateUser(pUser->pAcct, pCreate->user, pCreate->pass); if (code == TSDB_CODE_SUCCESS) { mLPrint("user:%s is created by %s", pCreate->user, pUser->user); @@ -395,29 +433,34 @@ void mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) { code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -void mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pOperUser = mgmtGetUserFromConn(ahandle); + SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); if (pOperUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SAlterUserMsg *pAlter = pCont; + SAlterUserMsg *pAlter = rpcMsg->pCont; SUserObj *pUser = mgmtGetUser(pAlter->user); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } @@ -447,7 +490,9 @@ void mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(ahandle, code, NULL, 0); + + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); return; } @@ -496,34 +541,40 @@ void mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) { code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(ahandle, code, NULL, 0); + + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); return; } - code = TSDB_CODE_NO_RIGHTS; - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); } -void mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return ; } - SUserObj *pOperUser = mgmtGetUserFromConn(ahandle); + SUserObj *pOperUser = mgmtGetUserFromConn(rpcMsg->handle); if (pOperUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return ; } - SDropUserMsg *pDrop = pCont; + SDropUserMsg *pDrop = rpcMsg->pCont; SUserObj *pUser = mgmtGetUser(pDrop->user); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return ; } if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) { - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return ; } @@ -554,23 +605,26 @@ void mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) { code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -void mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessDropDbMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return ; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return ; } int32_t code; if (pUser->superAuth) { - SDropDbMsg *pDrop = pCont; + SDropDbMsg *pDrop = rpcMsg->pCont; code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists); if (code == TSDB_CODE_SUCCESS) { mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user); @@ -579,7 +633,8 @@ void mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { code = TSDB_CODE_NO_RIGHTS; } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } static void mgmtInitShowMsgFp() { @@ -618,10 +673,12 @@ static void mgmtInitShowMsgFp() { mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; } -void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { - SShowMsg *pShowMsg = pCont; +void mgmtProcessShowMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + SShowMsg *pShowMsg = rpcMsg->pCont; if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } } @@ -629,7 +686,8 @@ void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; SShowRsp *pShowRsp = rpcMallocCont(size); if (pShowRsp == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; + rpcSendResponse(&rpcRsp); return; } @@ -649,7 +707,7 @@ void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { mgmtSaveQhandle(pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow); - code = (*mgmtGetMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->tableMeta, pShow, ahandle); + code = (*mgmtGetMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->tableMeta, pShow, rpcMsg->handle); if (code == 0) { size = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; } else { @@ -659,14 +717,18 @@ void mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { } } - rpcSendResponse(ahandle, code, pShowRsp, size); + rpcRsp.pCont = pShowRsp; + rpcRsp.contLen = size; + rpcSendResponse(&rpcRsp); } -void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { +void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + int32_t rowsToRead = 0; int32_t size = 0; int32_t rowsRead = 0; - SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *)pCont; + SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) rpcMsg->pCont; pRetrieve->qhandle = htobe64(pRetrieve->qhandle); /* @@ -675,14 +737,16 @@ void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { */ if (!mgmtCheckQhandle(pRetrieve->qhandle)) { mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_QHANDLE, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_QHANDLE; + rpcSendResponse(&rpcRsp); return; } SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; if (pShow->signature != (void *)pShow) { mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); - rpcSendResponse(ahandle, TSDB_CODE_MEMORY_CORRUPTED, NULL, 0); + rpcRsp.code = TSDB_CODE_MEMORY_CORRUPTED; + rpcSendResponse(&rpcRsp); return; } else { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -705,7 +769,7 @@ void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { // if free flag is set, client wants to clean the resources if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) - rowsRead = (*mgmtRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, ahandle); + rowsRead = (*mgmtRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, rpcMsg->handle); if (rowsRead < 0) { rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS; @@ -716,107 +780,120 @@ void mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { pRsp->numOfRows = htonl(rowsRead); pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision - rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pRsp, size); + rpcRsp.pCont = pRsp; + rpcRsp.contLen = size; + rpcSendResponse(&rpcRsp); if (rowsToRead == 0) { mgmtFreeQhandle(pShow); } } -void mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { - SCreateTableMsg *pCreate = (SCreateTableMsg *) pCont; +void mgmtProcessCreateTableMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + SCreateTableMsg *pCreate = (SCreateTableMsg *) rpcMsg->pCont; pCreate->numOfColumns = htons(pCreate->numOfColumns); pCreate->numOfTags = htons(pCreate->numOfTags); pCreate->sqlLen = htons(pCreate->sqlLen); - SSchema *pSchema = pCreate->schema; + SSchema *pSchema = (SSchema*) pCreate->schema; for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { pSchema->bytes = htons(pSchema->bytes); pSchema->colId = i; pSchema++; } - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("table:%s, failed to create table, need redirect message", pCreate->tableId); return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("table:%s, failed to create table, invalid user", pCreate->tableId); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (!pUser->writeAuth) { mError("table:%s, failed to create table, no rights", pCreate->tableId); - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } - int32_t code = mgmtCreateTable(pCreate, contLen, ahandle, false); + int32_t code = mgmtCreateTable(pCreate, rpcMsg->contLen, rpcMsg->handle, false); if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } } -void mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { - SDropTableMsg *pDrop = (SDropTableMsg *) pCont; +void mgmtProcessDropTableMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SDropTableMsg *pDrop = (SDropTableMsg *) rpcMsg->pCont; - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("table:%s, failed to drop table, need redirect message", pDrop->tableId); return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("table:%s, failed to drop table, invalid user", pDrop->tableId); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (!pUser->writeAuth) { mError("table:%s, failed to drop table, no rights", pDrop->tableId); - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } SDbObj *pDb = mgmtGetDbByTableId(pDrop->tableId); if (pDb == NULL) { mError("table:%s, failed to drop table, db not selected", pDrop->tableId); - rpcSendResponse(ahandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0); + rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; + rpcSendResponse(&rpcRsp); return; } int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); if (code != TSDB_CODE_ACTION_IN_PROGRESS) { - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } } -void mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SAlterTableMsg *pAlter = (SAlterTableMsg *) pCont; - int32_t code; + SAlterTableMsg *pAlter = (SAlterTableMsg *) rpcMsg->pCont; if (!pUser->writeAuth) { - code = TSDB_CODE_NO_RIGHTS; + rpcRsp.code = TSDB_CODE_NO_RIGHTS; } else { pAlter->type = htons(pAlter->type); pAlter->numOfCols = htons(pAlter->numOfCols); if (pAlter->numOfCols > 2) { mError("table:%s error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols); - code = TSDB_CODE_APP_ERROR; + rpcRsp.code = TSDB_CODE_APP_ERROR; } else { SDbObj *pDb = mgmtGetDb(pAlter->db); if (pDb) { @@ -824,61 +901,61 @@ void mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) { pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); } - code = mgmtAlterTable(pDb, pAlter); - if (code == 0) { + rpcRsp.code = mgmtAlterTable(pDb, pAlter); + if (rpcRsp.code == 0) { mLPrint("table:%s is altered by %s", pAlter->tableId, pUser->user); } } else { - code = TSDB_CODE_DB_NOT_SELECTED; + rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; } } } - if (code != TSDB_CODE_SUCCESS) { - rpcSendResponse(ahandle, code, NULL, 0); - } + rpcSendResponse(&rpcRsp); } -void mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { +void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } - SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; - int32_t code; + SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *) rpcMsg->pCont; if (strcmp(pUser->pAcct->user, "root") != 0) { - code = TSDB_CODE_NO_RIGHTS; + rpcRsp.code = TSDB_CODE_NO_RIGHTS; } else { - code = mgmtSendCfgDnodeMsg(pCont); + rpcRsp.code = mgmtSendCfgDnodeMsg(rpcMsg->pCont); } - if (code == TSDB_CODE_SUCCESS) { + if (rpcRsp.code == TSDB_CODE_SUCCESS) { mTrace("dnode:%s is configured by %s", pCfg->ip, pUser->user); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcSendResponse(&rpcRsp); } -void mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { - SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) pCont; +void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SHeartBeatMsg *pHBMsg = (SHeartBeatMsg *) rpcMsg->pCont; mgmtSaveQueryStreamList(pHBMsg); - SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *) rpcMallocCont(contLen); + SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *) rpcMallocCont(sizeof(SHeartBeatRsp)); if (pHBRsp == NULL) { - rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); - rpcFreeCont(pCont); + rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; + rpcSendResponse(&rpcRsp); return; } SRpcConnInfo connInfo; - rpcGetConnInfo(ahandle, &connInfo); + rpcGetConnInfo(rpcMsg->handle, &connInfo); pHBRsp->ipList.inUse = 0; pHBRsp->ipList.port = htons(tsMgmtShellPort); @@ -904,7 +981,9 @@ void mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) { pHBRsp->streamId = 0; pHBRsp->killConnection = 0; - rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pHBRsp, sizeof(SHeartBeatMsg)); + rpcRsp.pCont = pHBRsp; + rpcRsp.contLen = sizeof(SHeartBeatRsp); + rpcSendResponse(&rpcRsp); } int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { @@ -922,10 +1001,12 @@ int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, } } -static void mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) { - SConnectMsg *pConnectMsg = (SConnectMsg *) pCont; +static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + SConnectMsg *pConnectMsg = (SConnectMsg *) rpcMsg->pCont; + SRpcConnInfo connInfo; - rpcGetConnInfo(thandle, &connInfo); + rpcGetConnInfo(rpcMsg->handle, &connInfo); int32_t code; SUserObj *pUser = mgmtGetUser(connInfo.user); @@ -987,13 +1068,15 @@ static void mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) { } connect_over: + rpcRsp.code = code; if (code != TSDB_CODE_SUCCESS) { mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(thandle, code, NULL, 0); } else { mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(thandle, code, pConnectRsp, sizeof(SConnectRsp)); + rpcRsp.pCont = pConnectRsp; + rpcRsp.contLen = sizeof(SConnectRsp); } + rpcSendResponse(&rpcRsp); } /** @@ -1024,48 +1107,51 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { return false; } -static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) { +static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { if (sdbGetRunStatus() != SDB_STATUS_SERVING) { mTrace("shell msg is ignored since SDB is not ready"); - rpcSendResponse(ahandle, TSDB_CODE_NOT_READY, NULL, 0); - rpcFreeCont(pCont); + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = TSDB_CODE_NOT_READY, .msgType = 0}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(rpcMsg->pCont); return; } - if (mgmtCheckMsgReadOnly(type, pCont)) { - (*mgmtProcessShellMsg[(int8_t)type])(pCont, contLen, ahandle); + if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { + (*mgmtProcessShellMsg[rpcMsg->msgType])(rpcMsg); } else { - if (mgmtProcessShellMsg[(int8_t)type]) { - mgmtAddToTranRequest((int8_t)type, pCont, contLen, ahandle); + if (mgmtProcessShellMsg[rpcMsg->msgType]) { + mgmtAddToTranRequest(rpcMsg); } else { - mError("%s from shell is not processed", taosMsg[(int8_t)type]); + mError("%s from shell is not processed", taosMsg[rpcMsg->msgType]); } } - //TODO free may be cause segment fault - // - // rpcFreeCont(pCont); + rpcFreeCont(rpcMsg->pCont); } void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { + SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb == NULL) { mError("table:%s, failed to create vgroup, db not found", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_DB; + rpcSendResponse(&rpcRsp); return; } SVgObj *pVgroup = mgmtCreateVgroup(pDb); if (pVgroup == NULL) { mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_ENOUGH_DNODES; + rpcSendResponse(&rpcRsp); return; } void *cont = rpcMallocCont(contLen); if (cont == NULL) { mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId); - rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + rpcRsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY; + rpcSendResponse(&rpcRsp); return; } @@ -1087,6 +1173,7 @@ void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *th void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta) { assert(pVgroup != NULL); + SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; int32_t sid = taosAllocateId(pVgroup->idPool); if (sid < 0) { @@ -1095,21 +1182,20 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c return; } - int32_t code; STableInfo *pTable; SDCreateTableMsg *pDCreate = NULL; if (pCreate->numOfColumns == 0) { mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + rpcRsp.code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); } else { mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid); - code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); + rpcRsp.code = mgmtCreateNormalTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable); } - if (code != TSDB_CODE_SUCCESS) { + if (rpcRsp.code != TSDB_CODE_SUCCESS) { mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid); - rpcSendResponse(thandle, code, NULL, 0); + rpcSendResponse(&rpcRsp); return; } @@ -1129,10 +1215,12 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c } void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { + SRpcMsg rpcRsp = {.handle = thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SDbObj* pDb = mgmtGetDbByTableId(pTable->tableId); if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) { mError("table:%s, failed to get table meta, db not selected", pTable->tableId); - rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0); + rpcRsp.code = TSDB_CODE_DB_NOT_SELECTED; + rpcSendResponse(&rpcRsp); return; } @@ -1141,15 +1229,17 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) { bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); STableMeta *pMeta = rpcMallocCont(sizeof(STableMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); - int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); + rpcRsp.code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); - if (code != TSDB_CODE_SUCCESS) { + if (rpcRsp.code != TSDB_CODE_SUCCESS) { rpcFreeCont(pMeta); - rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0); } else { pMeta->contLen = htons(pMeta->contLen); - rpcSendResponse(thandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen); + rpcRsp.pCont = pMeta; + rpcRsp.contLen = pMeta->contLen; } + + rpcSendResponse(&rpcRsp); } static int32_t mgmtCheckRedirectMsgImp(void *pConn) { @@ -1158,20 +1248,26 @@ static int32_t mgmtCheckRedirectMsgImp(void *pConn) { int32_t (*mgmtCheckRedirectMsg)(void *pConn) = mgmtCheckRedirectMsgImp; -static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) { - rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); +static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = { + .msgType = 0, + .pCont = 0, + .contLen = 0, + .code = TSDB_CODE_OPS_NOT_SUPPORT, + .handle = rpcMsg->handle + }; + rpcSendResponse(&rpcRsp); } -void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; - -static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) { +static void mgmtProcessAlterAcctMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (!mgmtAlterAcctFp) { - rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT; + rpcSendResponse(&rpcRsp); return; } - SAlterAcctMsg *pAlter = pCont; + SAlterAcctMsg *pAlter = rpcMsg->pCont; pAlter->cfg.maxUsers = htonl(pAlter->cfg.maxUsers); pAlter->cfg.maxDbs = htonl(pAlter->cfg.maxDbs); pAlter->cfg.maxTimeSeries = htonl(pAlter->cfg.maxTimeSeries); @@ -1183,21 +1279,23 @@ static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) pAlter->cfg.maxInbound = htobe64(pAlter->cfg.maxInbound); pAlter->cfg.maxOutbound = htobe64(pAlter->cfg.maxOutbound); - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("account:%s, failed to alter account, need redirect message", pAlter->user); return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("account:%s, failed to alter account, invalid user", pAlter->user); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (strcmp(pUser->user, "root") != 0) { mError("account:%s, failed to alter account, no rights", pAlter->user); - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } @@ -1208,32 +1306,36 @@ static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) mError("account:%s, failed to alter account, reason:%s", pAlter->user, tstrerror(code)); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -static void mgmtProcessDropAcctMsg(void *pCont, int32_t contLen, void *ahandle) { +static void mgmtProcessDropAcctMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (!mgmtDropAcctFp) { - rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT; + rpcSendResponse(&rpcRsp); return; } - SDropAcctMsg *pDrop = (SDropAcctMsg *) pCont; - - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + SDropAcctMsg *pDrop = (SDropAcctMsg *) rpcMsg->pCont; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("account:%s, failed to drop account, need redirect message", pDrop->user); return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("account:%s, failed to drop account, invalid user", pDrop->user); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (strcmp(pUser->user, "root") != 0) { mError("account:%s, failed to drop account, no rights", pDrop->user); - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } @@ -1244,16 +1346,19 @@ static void mgmtProcessDropAcctMsg(void *pCont, int32_t contLen, void *ahandle) mError("account:%s, failed to drop account, reason:%s", pDrop->user, tstrerror(code)); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle) { +static void mgmtProcessCreateAcctMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (!mgmtCreateAcctFp) { - rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT; + rpcSendResponse(&rpcRsp); return; } - SCreateAcctMsg *pCreate = (SCreateAcctMsg *) pCont; + SCreateAcctMsg *pCreate = (SCreateAcctMsg *) rpcMsg->pCont; pCreate->cfg.maxUsers = htonl(pCreate->cfg.maxUsers); pCreate->cfg.maxDbs = htonl(pCreate->cfg.maxDbs); pCreate->cfg.maxTimeSeries = htonl(pCreate->cfg.maxTimeSeries); @@ -1265,21 +1370,23 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle pCreate->cfg.maxInbound = htobe64(pCreate->cfg.maxInbound); pCreate->cfg.maxOutbound = htobe64(pCreate->cfg.maxOutbound); - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("account:%s, failed to create account, need redirect message", pCreate->user); return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("account:%s, failed to create account, invalid user", pCreate->user); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (strcmp(pUser->user, "root") != 0) { mError("account:%s, failed to create account, no rights", pCreate->user); - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } @@ -1290,31 +1397,36 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle mError("account:%s, failed to create account, reason:%s", pCreate->user, tstrerror(code)); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { +static void mgmtProcessCreateDnodeMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (!mgmtCreateDnodeFp) { - rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT; + rpcSendResponse(&rpcRsp); return; } - SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCont; - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *) rpcMsg->pCont; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("failed to create dnode:%s, redirect this message", pCreate->ip); return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_INVALID_USER)); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (strcmp(pUser->user, "root") != 0) { mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } @@ -1325,31 +1437,36 @@ static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandl mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(code)); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } -static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { +static void mgmtProcessDropDnodeMsg(SRpcMsg *rpcMsg) { + SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (!mgmtDropDnodeByIpFp) { - rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + rpcRsp.code = TSDB_CODE_OPS_NOT_SUPPORT; + rpcSendResponse(&rpcRsp); return; } - SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCont; - if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + SDropDnodeMsg *pDrop = (SDropDnodeMsg *) rpcMsg->pCont; + if (mgmtCheckRedirectMsg(rpcMsg->handle) != TSDB_CODE_SUCCESS) { mError("failed to drop dnode:%s, redirect this message", pDrop->ip); return; } - SUserObj *pUser = mgmtGetUserFromConn(ahandle); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_INVALID_USER)); - rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + rpcRsp.code = TSDB_CODE_INVALID_USER; + rpcSendResponse(&rpcRsp); return; } if (strcmp(pUser->user, "root") != 0) { mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); - rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + rpcSendResponse(&rpcRsp); return; } @@ -1360,7 +1477,8 @@ static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(code)); } - rpcSendResponse(ahandle, code, NULL, 0); + rpcRsp.code = code; + rpcSendResponse(&rpcRsp); } void mgmtInitProcessShellMsg() { @@ -1383,8 +1501,8 @@ void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessUnSupportMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessUnSupportMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessUnSupportMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index cc95d9f8cc..ca35de3757 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -133,7 +133,7 @@ int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); assert(pAcct != NULL); - int32_t code = mgmtCheckTableLimit(pAcct, pCreate); + int32_t code = mgmtCheckTableLimit(pAcct, pCreate->numOfColumns); if (code != TSDB_CODE_SUCCESS) { mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId); return code; -- GitLab