diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 9bae38e3a72dddb276679efb38eb7f38e7e1c455..826934c4a87e8832151567a4e2407c66de5c4d4a 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -153,7 +153,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) #define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 -enum _mgmt_table { +typedef enum _mgmt_table { + TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, TSDB_MGMT_TABLE_USER, TSDB_MGMT_TABLE_DB, @@ -175,7 +176,7 @@ enum _mgmt_table { TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_FUNCTION, TSDB_MGMT_TABLE_MAX, -}; +} EShowType; #define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 #define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 44ca1d1407f5f5533f07b7d65374043c70d5b389..f3e71564b23b477cb7a344a05c7da46dfffa3bd5 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -157,7 +157,7 @@ static int32_t dndInitClient(SDnode *pDnode) { rpcInit.label = "DND-C"; rpcInit.numOfThreads = 1; rpcInit.cfp = dndProcessResponse; - rpcInit.sessions = 8; + rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; rpcInit.user = INTERNAL_USER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9facb8829cebfc98a2a836ae9294b483939d96f4..dc27656dbfd78ad84b956a594c2a95c7e5fbb371 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -281,22 +281,24 @@ typedef struct SFuncObj { int16_t type; } SFuncObj; -typedef struct { - int8_t type; - int8_t maxReplica; - int16_t numOfColumns; - int32_t index; - int32_t rowSize; - int32_t numOfRows; - int32_t numOfReads; - uint16_t payloadLen; - void *pIter; - void *pVgIter; - void **ppShow; - char db[TSDB_FULL_DB_NAME_LEN]; - int16_t offset[TSDB_MAX_COLUMNS]; - int32_t bytes[TSDB_MAX_COLUMNS]; - char payload[]; +typedef struct SShowObj SShowObj; +typedef struct SShowObj { + int8_t type; + int8_t maxReplica; + int16_t numOfColumns; + int32_t id; + int32_t rowSize; + int32_t numOfRows; + int32_t numOfReads; + uint16_t payloadLen; + void *pIter; + void *pVgIter; + SMnode *pMnode; + SShowObj **ppShow; + char db[TSDB_FULL_DB_NAME_LEN]; + int16_t offset[TSDB_MAX_COLUMNS]; + int32_t bytes[TSDB_MAX_COLUMNS]; + char payload[]; } SShowObj; typedef struct SMnodeMsg { diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 8910ed4e631ef0b46a1d30c61b84ab41f47b61bb..e5643847a5bdd42ce784b92ebb7a95d7e68bf75b 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -18,6 +18,7 @@ #include "mndDef.h" #include "sdb.h" +#include "tcache.h" #include "tqueue.h" #ifdef __cplusplus @@ -27,6 +28,9 @@ extern "C" { typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); +typedef int32_t (*ShowMetaFp)(SMnode *pMnode, STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +typedef int32_t (*ShowRetrieveFp)(SMnode *pMnode, SShowObj *pShow, char *data, int32_t rows, void *pConn); +typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); typedef struct { const char *name; @@ -34,6 +38,14 @@ typedef struct { MndCleanupFp cleanupFp; } SMnodeStep; +typedef struct { + int32_t showId; + ShowMetaFp metaFps[TSDB_MGMT_TABLE_MAX]; + ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX]; + ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX]; + SCacheObj *cache; +} SShowMgmt; + typedef struct SMnode { int32_t dnodeId; int32_t clusterId; @@ -45,6 +57,7 @@ typedef struct SMnode { SSdb *pSdb; SDnode *pDnode; SArray *pSteps; + SShowMgmt showMgmt; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; @@ -53,6 +66,7 @@ typedef struct SMnode { int32_t sver; int32_t statusInterval; int32_t mnodeEqualVnodeNum; + int32_t shellActivityTimer; char *timezone; char *locale; char *charset; diff --git a/source/dnode/mnode/impl/inc/mndShow.h b/source/dnode/mnode/impl/inc/mndShow.h index 06c18cb0297bf4cfdf2eb604049234b667146a6f..31c0b69c85bcba9f4821f1d0758bbfc20067fbce 100644 --- a/source/dnode/mnode/impl/inc/mndShow.h +++ b/source/dnode/mnode/impl/inc/mndShow.h @@ -24,6 +24,10 @@ extern "C" { int32_t mndInitShow(SMnode *pMnode); void mndCleanupShow(SMnode *pMnode); +void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp); +void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp); +void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp); +void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 1780c88d6b2ff1e35419212e7f5fc596382bc42d..dbe5dd4f897b2d53999eb5ebcf17316c3efedcec 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndCluster.h" #include "mndTrans.h" +#include "mndShow.h" #define SDB_CLUSTER_VER 1 @@ -94,6 +95,71 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { return sdbWrite(pMnode->pSdb, pRaw); } + +// static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +// int32_t cols = 0; +// SSchema *pSchema = pMeta->schema; + +// pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE; +// pSchema[cols].type = TSDB_DATA_TYPE_BINARY; +// strcpy(pSchema[cols].name, "clusterId"); +// pSchema[cols].bytes = htons(pShow->bytes[cols]); +// cols++; + +// pShow->bytes[cols] = 8; +// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; +// strcpy(pSchema[cols].name, "create_time"); +// pSchema[cols].bytes = htons(pShow->bytes[cols]); +// cols++; + +// pMeta->numOfColumns = htons(cols); +// strcpy(pMeta->tableFname, "show cluster"); +// pShow->numOfColumns = cols; + +// pShow->offset[0] = 0; +// for (int32_t i = 1; i < cols; ++i) { +// pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; +// } + +// pShow->numOfRows = 1; +// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + +// return 0; +// } + +// static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +// int32_t numOfRows = 0; +// int32_t cols = 0; +// char * pWrite; +// SClusterObj *pCluster = NULL; + +// while (numOfRows < rows) { +// pShow->pIter = mnodeGetNextCluster(pShow->pIter, &pCluster); +// if (pCluster == NULL) break; + +// cols = 0; + +// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; +// STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pCluster->uid, TSDB_CLUSTER_ID_LEN); +// cols++; + +// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; +// *(int64_t *) pWrite = pCluster->createdTime; +// cols++; + +// mnodeDecClusterRef(pCluster); +// numOfRows++; +// } + +// mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); +// pShow->numOfReads += numOfRows; +// return numOfRows; +// } + +// static void mnodeCancelGetNextCluster(void *pIter) { +// sdbFreeIter(tsClusterSdb, pIter); +// } + int32_t mndInitCluster(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_CLUSTER, .keyType = SDB_KEY_INT32, @@ -104,6 +170,9 @@ int32_t mndInitCluster(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndClusterActionUpdate, .deleteFp = (SdbDeleteFp)mndClusterActionDelete}; + // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta); + // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters); + // mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster); return sdbSetTable(pMnode->pSdb, table); } diff --git a/source/dnode/mnode/impl/src/mndConnect.c b/source/dnode/mnode/impl/src/mndConnect.c new file mode 100644 index 0000000000000000000000000000000000000000..8e62eacfabf06c3c2fe74a8c4020b54d4c2f0b38 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndConnect.c @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndShow.h" + + + +// static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { +// SHeartBeatRsp *pRsp = (SHeartBeatRsp *)rpcMallocCont(sizeof(SHeartBeatRsp)); +// if (pRsp == NULL) { +// return TSDB_CODE_MND_OUT_OF_MEMORY; +// } + +// SHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont; +// if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) { +// rpcFreeCont(pRsp); +// return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code +// } + +// SRpcConnInfo connInfo = {0}; +// rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo); + +// int32_t connId = htonl(pHBMsg->connId); +// SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); +// if (pConn == NULL) { +// pHBMsg->pid = htonl(pHBMsg->pid); +// pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName); +// } + +// if (pConn == NULL) { +// // do not close existing links, otherwise +// // mError("failed to create connId, close connect"); +// // pRsp->killConnection = 1; +// } else { +// pRsp->connId = htonl(pConn->connId); +// mnodeSaveQueryStreamList(pConn, pHBMsg); + +// if (pConn->killed != 0) { +// pRsp->killConnection = 1; +// } + +// if (pConn->streamId != 0) { +// pRsp->streamId = htonl(pConn->streamId); +// pConn->streamId = 0; +// } + +// if (pConn->queryId != 0) { +// pRsp->queryId = htonl(pConn->queryId); +// pConn->queryId = 0; +// } +// } + +// int32_t onlineDnodes = 0, totalDnodes = 0; +// mnodeGetOnlineAndTotalDnodesNum(&onlineDnodes, &totalDnodes); + +// pRsp->onlineDnodes = htonl(onlineDnodes); +// pRsp->totalDnodes = htonl(totalDnodes); +// mnodeGetMnodeEpSetForShell(&pRsp->epSet, false); + +// pMsg->rpcRsp.rsp = pRsp; +// pMsg->rpcRsp.len = sizeof(SHeartBeatRsp); + +// mnodeReleaseConn(pConn); +// return TSDB_CODE_SUCCESS; +// } + +// static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { +// SConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont; +// SConnectRsp *pConnectRsp = NULL; +// int32_t code = TSDB_CODE_SUCCESS; + +// SRpcConnInfo connInfo = {0}; +// if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) != 0) { +// mError("thandle:%p is already released while process connect msg", pMsg->rpcMsg.handle); +// code = TSDB_CODE_MND_INVALID_CONNECTION; +// goto connect_over; +// } + +// code = taosCheckVersion(pConnectMsg->clientVersion, version, 3); +// if (code != TSDB_CODE_SUCCESS) { +// goto connect_over; +// } + +// SUserObj *pUser = pMsg->pUser; +// SAcctObj *pAcct = pUser->pAcct; + +// if (pConnectMsg->db[0]) { +// char dbName[TSDB_TABLE_FNAME_LEN * 3] = {0}; +// sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); +// SDbObj *pDb = mnodeGetDb(dbName); +// if (pDb == NULL) { +// code = TSDB_CODE_MND_INVALID_DB; +// goto connect_over; +// } + +// if (pDb->status != TSDB_DB_STATUS_READY) { +// mError("db:%s, status:%d, in dropping", pDb->name, pDb->status); +// code = TSDB_CODE_MND_DB_IN_DROPPING; +// mnodeDecDbRef(pDb); +// goto connect_over; +// } +// mnodeDecDbRef(pDb); +// } + +// pConnectRsp = rpcMallocCont(sizeof(SConnectRsp)); +// if (pConnectRsp == NULL) { +// code = TSDB_CODE_MND_OUT_OF_MEMORY; +// goto connect_over; +// } + +// pConnectMsg->pid = htonl(pConnectMsg->pid); +// SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName); +// if (pConn == NULL) { +// code = terrno; +// } else { +// pConnectRsp->connId = htonl(pConn->connId); +// mnodeReleaseConn(pConn); +// } + +// sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); +// memcpy(pConnectRsp->serverVersion, version, TSDB_VERSION_LEN); +// pConnectRsp->writeAuth = pUser->writeAuth; +// pConnectRsp->superAuth = pUser->superAuth; + +// mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet, false); + +// dnodeGetClusterId(pConnectRsp->clusterId); + +// connect_over: +// if (code != TSDB_CODE_SUCCESS) { +// if (pConnectRsp) rpcFreeCont(pConnectRsp); +// mLError("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code)); +// } else { +// mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code)); +// pMsg->rpcRsp.rsp = pConnectRsp; +// pMsg->rpcRsp.len = sizeof(SConnectRsp); +// } + +// return code; +// } + + diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 3a5a2216775a311215a36410e44bb26789bba39d..0ad144d2dfc0e0d93dde661c507a6bb7c12f31e9 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -18,4 +18,22 @@ #include "mndInt.h" int32_t mndInitDb(SMnode *pMnode) { return 0; } -void mndCleanupDb(SMnode *pMnode) {} \ No newline at end of file +void mndCleanupDb(SMnode *pMnode) {} + + +// static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) { +// SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont; + +// int32_t code = TSDB_CODE_SUCCESS; +// if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pUseDbMsg->db); +// if (pMsg->pDb == NULL) { +// return TSDB_CODE_MND_INVALID_DB; +// } + +// if (pMsg->pDb->status != TSDB_DB_STATUS_READY) { +// mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status); +// return TSDB_CODE_MND_DB_IN_DROPPING; +// } + +// return code; +// } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 9938e95a7325a40103f531fc55c06d18f8da2f76..af6a2030064a09b5ea750ed01681b33b2adca605 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -14,8 +14,314 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndShow.h" -int32_t mndInitShow(SMnode *pMnode) { return 0; } -void mndCleanupShow(SMnode *pMnode) {} \ No newline at end of file +static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg); +static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMsg); +static bool mndCheckRetrieveFinished(SShowObj *pShow); +static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow); +static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); +static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow); +static void mndFreeShowObj(void *ppShow); +static char *mndShowStr(int32_t showType); + +int32_t mndInitShow(SMnode *pMnode) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + + pMgmt->cache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mndFreeShowObj, "show"); + if (pMgmt->cache == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to alloc show cache since %s", terrstr()); + return -1; + } + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW, mndProcessShowMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW_RETRIEVE, mndProcessRetrieveMsg); + return 0; +} + +void mndCleanupShow(SMnode *pMnode) {} + +static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont; + int8_t type = pMsg->type; + uint16_t payloadLen = htonl(pMsg->payloadLen); + + if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) { + terrno = TSDB_CODE_MND_INVALID_MSG_TYPE; + mError("failed to process show msg since %s", terrstr()); + return -1; + } + + ShowMetaFp metaFp = pMgmt->metaFps[type]; + if (metaFp == NULL) { + terrno = TSDB_CODE_MND_INVALID_MSG_TYPE; + mError("failed to process show-meta msg:%s since no message handle", mndShowStr(type)); + return -1; + } + + int32_t size = sizeof(SShowObj) + payloadLen; + SShowObj *pShow = calloc(1, size); + if (pShow != NULL) { + pShow->pMnode = pMnode; + pShow->type = type; + pShow->payloadLen = payloadLen; + memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN); + memcpy(pShow->payload, pMsg->payload, payloadLen); + } else { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr()); + return -1; + } + + if (mndPutShowObj(pMnode, pShow) == 0) { + mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr()); + free(pShow); + return -1; + } + + size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; + SShowRsp *pRsp = rpcMallocCont(size); + if (pRsp == NULL) { + mndReleaseShowObj(pShow, true); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("show:%d, failed to process show-meta msg:%s since malloc rsp error", pShow->id, mndShowStr(type)); + return -1; + } + + pRsp->qhandle = htobe64((uint64_t)pShow); + + int32_t code = (*metaFp)(pMnode, &pRsp->tableMeta, pShow, pMnodeMsg->rpcMsg.handle); + mDebug("show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s", pShow->id, mndShowStr(type), + pShow->numOfRows, pShow->numOfColumns, tstrerror(code)); + + if (code == TSDB_CODE_SUCCESS) { + pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; + pMnodeMsg->pCont = pRsp; + mndReleaseShowObj(pShow, false); + return TSDB_CODE_SUCCESS; + } else { + rpcFreeCont(pRsp); + mndReleaseShowObj(pShow, true); + return code; + } +} + +static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + int32_t rowsToRead = 0; + int32_t size = 0; + int32_t rowsRead = 0; + + SRetrieveTableMsg *pRetrieve = pMnodeMsg->rpcMsg.pCont; + pRetrieve->qhandle = htobe64(pRetrieve->qhandle); + SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; + + /* + * in case of server restart, apps may hold qhandle created by server before + * restart, which is actually invalid, therefore, signature check is required. + */ + if (mndAcquireShowObj(pMnode, pShow) != 0) { + terrno = TSDB_CODE_MND_INVALID_SHOWOBJ; + mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr()); + return -1; + } + + ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type]; + if (retrieveFp == NULL) { + mndReleaseShowObj(pShow, false); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr()); + return -1; + } + + mDebug("show:%d, type:%s, start retrieve data, numOfReads:%d numOfRows:%d", pShow->id, mndShowStr(pShow->type), + pShow->numOfReads, pShow->numOfRows); + + if (mndCheckRetrieveFinished(pShow)) { + mDebug("show:%d, read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads, pShow->numOfRows); + pShow->numOfReads = pShow->numOfRows; + } + + if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { + rowsToRead = pShow->numOfRows - pShow->numOfReads; + } + + /* return no more than 100 tables in one round trip */ + if (rowsToRead > 100) rowsToRead = 100; + + /* + * the actual number of table may be larger than the value of pShow->numOfRows, if a query is + * issued during a continuous create table operation. Therefore, rowToRead may be less than 0. + */ + if (rowsToRead < 0) rowsToRead = 0; + size = pShow->rowSize * rowsToRead; + + size += 100; + SRetrieveTableRsp *pRsp = rpcMallocCont(size); + if (pRsp == NULL) { + mndReleaseShowObj(pShow, false); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr()); + return -1; + } + + // if free flag is set, client wants to clean the resources + if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { + rowsRead = (*retrieveFp)(pMnode, pShow, pRsp->data, rowsToRead, pMnodeMsg->rpcMsg.handle); + } + + mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead); + + pRsp->numOfRows = htonl(rowsRead); + pRsp->precision = (int16_t)htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision + + pMnodeMsg->pCont = pRsp; + pMnodeMsg->contLen = size; + + if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { + pRsp->completed = 1; + mDebug("%p, retrieve completed", pShow); + mndReleaseShowObj(pShow, true); + } else { + mDebug("%p, retrieve not completed yet", pShow); + mndReleaseShowObj(pShow, false); + } + + return TSDB_CODE_SUCCESS; +} + +static char *mndShowStr(int32_t showType) { + switch (showType) { + case TSDB_MGMT_TABLE_ACCT: + return "show accounts"; + case TSDB_MGMT_TABLE_USER: + return "show users"; + case TSDB_MGMT_TABLE_DB: + return "show databases"; + case TSDB_MGMT_TABLE_TABLE: + return "show tables"; + case TSDB_MGMT_TABLE_DNODE: + return "show dnodes"; + case TSDB_MGMT_TABLE_MNODE: + return "show mnodes"; + case TSDB_MGMT_TABLE_VGROUP: + return "show vgroups"; + case TSDB_MGMT_TABLE_METRIC: + return "show stables"; + case TSDB_MGMT_TABLE_MODULE: + return "show modules"; + case TSDB_MGMT_TABLE_QUERIES: + return "show queries"; + case TSDB_MGMT_TABLE_STREAMS: + return "show streams"; + case TSDB_MGMT_TABLE_VARIABLES: + return "show configs"; + case TSDB_MGMT_TABLE_CONNS: + return "show connections"; + case TSDB_MGMT_TABLE_SCORES: + return "show scores"; + case TSDB_MGMT_TABLE_GRANTS: + return "show grants"; + case TSDB_MGMT_TABLE_VNODES: + return "show vnodes"; + case TSDB_MGMT_TABLE_CLUSTER: + return "show clusters"; + case TSDB_MGMT_TABLE_STREAMTABLES: + return "show streamtables"; + case TSDB_MGMT_TABLE_TP: + return "show topics"; + default: + return "undefined"; + } +} + +static bool mndCheckRetrieveFinished(SShowObj *pShow) { + if (pShow->pIter == NULL && pShow->numOfReads != 0) { + return true; + } + return false; +} + +static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) { + TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow; + + SShowMgmt *pMgmt = &pMnode->showMgmt; + SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE)); + if (ppShow) { + mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow); + return 0; + } + + return -1; +} + +static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { + SMnode *pMnode = pShow->pMnode; + SShowMgmt *pMgmt = &pMnode->showMgmt; + SShowObj **ppShow = (SShowObj **)pShow->ppShow; + taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove); + mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove); +} + +static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000; + + TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow; + pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1); + SShowObj **ppShow = + taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan); + if (ppShow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("show:%d, failed to put into cache", pShow->id); + return -1; + } + + mTrace("show:%d, data:%p put into cache", pShow->id, ppShow); + return 0; +} + +static void mndFreeShowObj(void *ppShow) { + SShowObj *pShow = *(SShowObj **)ppShow; + SMnode *pMnode = pShow->pMnode; + SShowMgmt *pMgmt = &pMnode->showMgmt; + + ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type]; + if (freeFp != NULL) { + if (pShow->pVgIter != NULL) { + // only used in 'show vnodes "ep"' + (*freeFp)(pMnode, pShow->pVgIter); + } + if (pShow->pIter != NULL) { + (*freeFp)(pMnode, pShow->pIter); + } + } + + mDebug("show:%d, data:%p destroyed", pShow->id, ppShow); + tfree(pShow); +} + +void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) { + if (rows < capacity) { + for (int32_t i = 0; i < numOfCols; ++i) { + memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows); + } + } +} + +void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + pMgmt->metaFps[showType] = fp; +} + +void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + pMgmt->retrieveFps[showType] = fp; +} + +void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + pMgmt->freeIterFps[showType] = fp; +}