From ddbf242fa6d8fca8f8cea771a0bffd539db01aea Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 23 Feb 2020 18:04:57 +0800 Subject: [PATCH] process show message --- src/client/src/tscServer.c | 10 +- src/dnode/src/dnodeShell.c | 2 +- src/inc/mnode.h | 2 +- src/inc/taosmsg.h | 13 +-- src/mnode/inc/mgmtConn.h | 4 + src/mnode/src/mgmtConn.c | 10 ++ src/mnode/src/mgmtShell.c | 205 +++++++++++++++---------------------- src/rpc/src/rpcMain.c | 2 +- 8 files changed, 110 insertions(+), 138 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9094e7d8d0..4c3739e4bb 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1205,7 +1205,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(pQueryInfo->type); pSql->cmd.payloadLen = pMsg - pStart; - pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE; + pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; return TSDB_CODE_SUCCESS; } @@ -1918,7 +1918,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowTableMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE; + int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for show msg", pSql); return -1; @@ -1951,7 +1951,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { strncpy(pShowMsg->payload, pPattern->z, pPattern->n); pShowMsg->payloadLen = htons(pPattern->n); } - pMsg += (sizeof(SShowTableMsg) + pPattern->n); + pMsg += (sizeof(SShowMsg) + pPattern->n); } else { SSQLToken *pIpAddr = &pShowInfo->prefix; assert(pIpAddr->n > 0 && pIpAddr->type > 0); @@ -1959,7 +1959,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n); pShowMsg->payloadLen = htons(pIpAddr->n); - pMsg += (sizeof(SShowTableMsg) + pIpAddr->n); + pMsg += (sizeof(SShowMsg) + pIpAddr->n); } pCmd->payloadLen = pMsg - pStart; @@ -2216,7 +2216,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { msgLen = pMsg - pStart; pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE; + pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE; return TSDB_CODE_SUCCESS; } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index c7747f112e..0c0fc19325 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -59,7 +59,7 @@ void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, voi if (msgType == TSDB_MSG_TYPE_DNODE_QUERY) { dnodeProcessQueryRequest(pCont, contLen, handle); - } else if (msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE) { + } else if (msgType == TSDB_MSG_TYPE_RETRIEVE) { dnodeProcessRetrieveRequest(pCont, contLen, handle); } else if (msgType == TSDB_MSG_TYPE_DNODE_SUBMIT) { dnodeProcessShellSubmitRequest(pCont, contLen, handle); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 8fb82b2bfc..3882c30268 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -302,6 +302,7 @@ typedef struct { typedef struct { char type; + char db[TSDB_DB_NAME_LEN]; void * pNode; short numOfColumns; int rowSize; @@ -314,7 +315,6 @@ typedef struct { char payload[]; /* payload for wildcard match in show tables */ } SShowObj; - //mgmtSystem int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index ea2c01fcb6..9a65ef2705 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -35,8 +35,8 @@ extern "C" { #define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 #define TSDB_MSG_TYPE_DNODE_QUERY 5 #define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 -#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7 -#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_RETRIEVE 7 +#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9 #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 @@ -269,10 +269,6 @@ typedef struct { int8_t data[]; } SDCreateTableMsg; -typedef struct { - char db[TSDB_DB_NAME_LEN]; -} SShowTableMsg; - typedef struct { char tableId[TSDB_TABLE_ID_LEN]; char db[TSDB_DB_NAME_LEN]; @@ -738,7 +734,8 @@ typedef struct { * payloadLen is the length of payload */ typedef struct { - int8_t type; + int8_t type; + char db[TSDB_DB_NAME_LEN]; uint16_t payloadLen; char payload[]; } SShowMsg; @@ -749,7 +746,7 @@ typedef struct { } SShowRsp; typedef struct { - char ip[20]; + char ip[32]; } SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg; typedef struct { diff --git a/src/mnode/inc/mgmtConn.h b/src/mnode/inc/mgmtConn.h index 93de283036..693b86488a 100644 --- a/src/mnode/inc/mgmtConn.h +++ b/src/mnode/inc/mgmtConn.h @@ -25,6 +25,10 @@ extern "C" { int mgmtGetConnsMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn); int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn); +bool mgmtCheckQhandle(uint64_t qhandle); +void mgmtSaveQhandle(void *qhandle); +void mgmtFreeQhandle(void *qhandle); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtConn.c b/src/mnode/src/mgmtConn.c index 86448b220b..8e0cb48184 100644 --- a/src/mnode/src/mgmtConn.c +++ b/src/mnode/src/mgmtConn.c @@ -141,3 +141,13 @@ int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) { pShow->numOfReads += numOfRows; return numOfRows; } + +bool mgmtCheckQhandle(uint64_t qhandle) { + return true; +} + +void mgmtSaveQhandle(void *qhandle) { +} + +void mgmtFreeQhandle(void *qhandle) { +} \ No newline at end of file diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 140a89a891..3392621cf5 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -910,150 +910,111 @@ static void mgmtInitShowMsgFp() { } int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { - SShowMsg * pShowMsg = (SShowMsg *)pCont; - - STaosRsp * pRsp; - char * pStart; - int32_t code = 0; - SShowRsp *pShowRsp; - SShowObj * pShow = NULL; - + SShowMsg *pShowMsg = pCont; if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { - if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_SHOW_RSP) != 0) { - return 0; + if (mgmtCheckRedirectMsg(ahandle) != 0) { + return TSDB_CODE_REDIRECT; } } - int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + - TSDB_EXTRA_PAYLOAD_SIZE; - pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_SHOW_RSP, size); - if (pStart == NULL) { - taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_SHOW_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); - return 0; + int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; + SShowRsp *pShowRsp = rpcMallocCont(size); + if (pShowRsp == NULL) { + rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + return TSDB_CODE_SERV_OUT_OF_MEMORY; } - pMsg = pStart; - pRsp = (STaosRsp *)pMsg; - pMsg = (char *)pRsp->more; - + int32_t code; if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { - code = -1; + code = TSDB_CODE_INVALID_MSG_TYPE; } else { - pShow = (SShowObj *)calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); + SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen)); pShow->signature = pShow; - pShow->type = pShowMsg->type; + pShow->type = pShowMsg->type; + strcpy(pShow->db, pShow->db); mTrace("pShow:%p is allocated", pShow); // set the table name query condition pShow->payloadLen = htons(pShowMsg->payloadLen); memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen); - pShowRsp = (SShowRsp *)pMsg; - pShowRsp->qhandle = (uint64_t)pShow; // qhandle; - pConn->qhandle = pShowRsp->qhandle; - - code = (*mgmtGetMetaFp[(uint8_t)pShowMsg->type])(&pShowRsp->meterMeta, pShow, pConn); + mgmtSaveQhandle(pShow); + pShowRsp->qhandle = htobe64((uint64_t) pShow); + code = (*mgmtGetMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->meterMeta, pShow, ahandle); if (code == 0) { - pMsg += sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; + size = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; } else { - mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, taosMsg[(uint8_t)pShowMsg->type], code); + mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, + taosMsg[(uint8_t) pShowMsg->type], code); free(pShow); } } - pRsp->code = code; - msgLen = pMsg - pStart; - taosSendMsgToPeer(pConn->thandle, pStart, msgLen); - - return msgLen; + rpcSendResponse(ahandle, code, pShowRsp, size); + return code; } int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) { -// SRetrieveMeterMsg *pRetrieve; -// SRetrieveMeterRsp *pRsp; -// int32_t rowsToRead = 0, size = 0, rowsRead = 0; -// char * pStart; -// int32_t code = 0; -// SShowObj * pShow; -// -// pRetrieve = (SRetrieveMeterMsg *)pMsg; -// -// /* -// * in case of server restart, apps may hold qhandle created by server before -// * restart, which is actually invalid, therefore, signature check is required. -// */ -// if (pRetrieve->qhandle != pConn->qhandle) { -// mError("retrieve:%p, qhandle:%p is not matched with saved:%p", pRetrieve, pRetrieve->qhandle, pConn->qhandle); -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); -// return -1; -// } -// -// pShow = (SShowObj *)pRetrieve->qhandle; -// if (pShow->signature != (void *)pShow) { -// mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); -// return -1; -// } else { -// if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { -// rowsToRead = pShow->numOfRows - pShow->numOfReads; -// } -// -// /* return no more than 100 meters in one round trip */ -// if (rowsToRead > 100) rowsToRead = 100; -// -// /* -// * the actual number of table may be larger than the value of pShow->numOfRows, if a query is -// * issued during a continuous create table operation. Therefore, rowToRead may be less than 0. -// */ -// if (rowsToRead < 0) rowsToRead = 0; -// size = pShow->rowSize * rowsToRead; -// } -// -// pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, size + 100); -// if (pStart == NULL) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// -// pMsg = pStart; -// -// STaosRsp *pTaosRsp = (STaosRsp *)pStart; -// pTaosRsp->code = code; -// pMsg = pTaosRsp->more; -// -// if (code == 0) { -// pRsp = (SRetrieveMeterRsp *)pMsg; -// pMsg = pRsp->data; -// -// // if free flag is set, client wants to clean the resources -// if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) -// rowsRead = (*mgmtRetrieveFp[(uint8_t)pShow->type])(pShow, pRsp->data, rowsToRead, pConn); -// -// if (rowsRead < 0) { -// rowsRead = 0; -// pTaosRsp->code = TSDB_CODE_ACTION_IN_PROGRESS; -// } -// -// pRsp->numOfRows = htonl(rowsRead); -// pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision -// pMsg += size; -// } -// -// msgLen = pMsg - pStart; -// taosSendMsgToPeer(pConn->thandle, pStart, msgLen); -// -// if (rowsToRead == 0) { -// uintptr_t oldSign = (uintptr_t)atomic_val_compare_exchange_ptr(&pShow->signature, pShow, 0); -// if (oldSign != (uintptr_t)pShow) { -// return msgLen; -// } -// // pShow->signature = 0; -// mTrace("pShow:%p is released", pShow); -// tfree(pShow); -// } -// -// return msgLen; - return 0; + int32_t rowsToRead = 0; + int32_t size = 0; + int32_t rowsRead = 0; + SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pCont; + pRetrieve->qhandle = htobe64(pRetrieve->qhandle); + + /* + * in case of server restart, apps may hold qhandle created by server before + * restart, which is actually invalid, therefore, signature check is required. + */ + if (!mgmtCheckQhandle(pRetrieve->qhandle)) { + mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_QHANDLE, NULL, 0); + return TSDB_CODE_INVALID_QHANDLE; + } + + SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; + if (pShow->signature != (void *)pShow) { + mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); + rpcSendResponse(ahandle, TSDB_CODE_MEMORY_CORRUPTED, NULL, 0); + return TSDB_CODE_MEMORY_CORRUPTED; + } else { + if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { + rowsToRead = pShow->numOfRows - pShow->numOfReads; + } + + /* return no more than 100 meters in one round trip */ + if (rowsToRead > 100) rowsToRead = 100; + + /* + * the actual number of table may be larger than the value of pShow->numOfRows, if a query is + * issued during a continuous create table operation. Therefore, rowToRead may be less than 0. + */ + if (rowsToRead < 0) rowsToRead = 0; + size = pShow->rowSize * rowsToRead; + } + + size += 100; + SRetrieveMeterRsp *pRsp = rpcMallocCont(size); + + // if free flag is set, client wants to clean the resources + if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) + rowsRead = (*mgmtRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, ahandle); + + if (rowsRead < 0) { + rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS; + rpcFreeCont(pRsp); + return TSDB_CODE_ACTION_IN_PROGRESS; + } + + pRsp->numOfRows = htonl(rowsRead); + pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision + + rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pRsp, size); + + if (rowsToRead == 0) { + mgmtFreeQhandle(pShow); + } + + return TSDB_CODE_SUCCESS; } int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) { @@ -1364,7 +1325,7 @@ static bool mgmtCheckMeterMetaMsgType(void *pMsg) { static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { if ((type == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) || - type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_DNODE_RETRIEVE || + type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_RETRIEVE || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_MULTI_TABLE_META || type == TSDB_MSG_TYPE_CONNECT) { return true; @@ -1419,7 +1380,7 @@ void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessMeterMetaMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessMetricMetaMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiMeterMetaMsg; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index caa1ea5c75..1eb95aaf36 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -338,7 +338,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection - if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE || + if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || type == TSDB_MSG_TYPE_SHOW ) pContext->connType = RPC_CONN_TCPC; -- GitLab