提交 6d7388de 编写于 作者: S Shengliang Guan

TD-10431 process show msg

上级 a117cb0d
...@@ -153,7 +153,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) ...@@ -153,7 +153,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
enum _mgmt_table { typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT, TSDB_MGMT_TABLE_ACCT,
TSDB_MGMT_TABLE_USER, TSDB_MGMT_TABLE_USER,
TSDB_MGMT_TABLE_DB, TSDB_MGMT_TABLE_DB,
...@@ -175,7 +176,7 @@ enum _mgmt_table { ...@@ -175,7 +176,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_TP,
TSDB_MGMT_TABLE_FUNCTION, TSDB_MGMT_TABLE_FUNCTION,
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
}; } EShowType;
#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 #define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1
#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 #define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2
......
...@@ -157,7 +157,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -157,7 +157,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse; rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = 8; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.user = INTERNAL_USER; rpcInit.user = INTERNAL_USER;
......
...@@ -281,22 +281,24 @@ typedef struct SFuncObj { ...@@ -281,22 +281,24 @@ typedef struct SFuncObj {
int16_t type; int16_t type;
} SFuncObj; } SFuncObj;
typedef struct { typedef struct SShowObj SShowObj;
int8_t type; typedef struct SShowObj {
int8_t maxReplica; int8_t type;
int16_t numOfColumns; int8_t maxReplica;
int32_t index; int16_t numOfColumns;
int32_t rowSize; int32_t id;
int32_t numOfRows; int32_t rowSize;
int32_t numOfReads; int32_t numOfRows;
uint16_t payloadLen; int32_t numOfReads;
void *pIter; uint16_t payloadLen;
void *pVgIter; void *pIter;
void **ppShow; void *pVgIter;
char db[TSDB_FULL_DB_NAME_LEN]; SMnode *pMnode;
int16_t offset[TSDB_MAX_COLUMNS]; SShowObj **ppShow;
int32_t bytes[TSDB_MAX_COLUMNS]; char db[TSDB_FULL_DB_NAME_LEN];
char payload[]; int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
char payload[];
} SShowObj; } SShowObj;
typedef struct SMnodeMsg { typedef struct SMnodeMsg {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "mndDef.h" #include "mndDef.h"
#include "sdb.h" #include "sdb.h"
#include "tcache.h"
#include "tqueue.h" #include "tqueue.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -27,6 +28,9 @@ extern "C" { ...@@ -27,6 +28,9 @@ extern "C" {
typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg); typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowMetaFp)(SMnode *pMnode, STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*ShowRetrieveFp)(SMnode *pMnode, SShowObj *pShow, char *data, int32_t rows, void *pConn);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct { typedef struct {
const char *name; const char *name;
...@@ -34,6 +38,14 @@ typedef struct { ...@@ -34,6 +38,14 @@ typedef struct {
MndCleanupFp cleanupFp; MndCleanupFp cleanupFp;
} SMnodeStep; } SMnodeStep;
typedef struct {
int32_t showId;
ShowMetaFp metaFps[TSDB_MGMT_TABLE_MAX];
ShowRetrieveFp retrieveFps[TSDB_MGMT_TABLE_MAX];
ShowFreeIterFp freeIterFps[TSDB_MGMT_TABLE_MAX];
SCacheObj *cache;
} SShowMgmt;
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
...@@ -45,6 +57,7 @@ typedef struct SMnode { ...@@ -45,6 +57,7 @@ typedef struct SMnode {
SSdb *pSdb; SSdb *pSdb;
SDnode *pDnode; SDnode *pDnode;
SArray *pSteps; SArray *pSteps;
SShowMgmt showMgmt;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
...@@ -53,6 +66,7 @@ typedef struct SMnode { ...@@ -53,6 +66,7 @@ typedef struct SMnode {
int32_t sver; int32_t sver;
int32_t statusInterval; int32_t statusInterval;
int32_t mnodeEqualVnodeNum; int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone; char *timezone;
char *locale; char *locale;
char *charset; char *charset;
......
...@@ -24,6 +24,10 @@ extern "C" { ...@@ -24,6 +24,10 @@ extern "C" {
int32_t mndInitShow(SMnode *pMnode); int32_t mndInitShow(SMnode *pMnode);
void mndCleanupShow(SMnode *pMnode); void mndCleanupShow(SMnode *pMnode);
void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp);
void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndCluster.h" #include "mndCluster.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndShow.h"
#define SDB_CLUSTER_VER 1 #define SDB_CLUSTER_VER 1
...@@ -94,6 +95,71 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -94,6 +95,71 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw); return sdbWrite(pMnode->pSdb, pRaw);
} }
// static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
// SSchema *pSchema = pMeta->schema;
// pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "clusterId");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "create_time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
// pMeta->numOfColumns = htons(cols);
// strcpy(pMeta->tableFname, "show cluster");
// pShow->numOfColumns = cols;
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) {
// pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
// }
// pShow->numOfRows = 1;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
// return 0;
// }
// static int32_t mnodeRetrieveClusters(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
// int32_t numOfRows = 0;
// int32_t cols = 0;
// char * pWrite;
// SClusterObj *pCluster = NULL;
// while (numOfRows < rows) {
// pShow->pIter = mnodeGetNextCluster(pShow->pIter, &pCluster);
// if (pCluster == NULL) break;
// cols = 0;
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pCluster->uid, TSDB_CLUSTER_ID_LEN);
// cols++;
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *) pWrite = pCluster->createdTime;
// cols++;
// mnodeDecClusterRef(pCluster);
// numOfRows++;
// }
// mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
// pShow->numOfReads += numOfRows;
// return numOfRows;
// }
// static void mnodeCancelGetNextCluster(void *pIter) {
// sdbFreeIter(tsClusterSdb, pIter);
// }
int32_t mndInitCluster(SMnode *pMnode) { int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER, SSdbTable table = {.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT32, .keyType = SDB_KEY_INT32,
...@@ -104,6 +170,9 @@ int32_t mndInitCluster(SMnode *pMnode) { ...@@ -104,6 +170,9 @@ int32_t mndInitCluster(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndClusterActionUpdate, .updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete}; .deleteFp = (SdbDeleteFp)mndClusterActionDelete};
// mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta);
// mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters);
// mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndShow.h"
// static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
// SHeartBeatRsp *pRsp = (SHeartBeatRsp *)rpcMallocCont(sizeof(SHeartBeatRsp));
// if (pRsp == NULL) {
// return TSDB_CODE_MND_OUT_OF_MEMORY;
// }
// SHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
// if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) {
// rpcFreeCont(pRsp);
// return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code
// }
// SRpcConnInfo connInfo = {0};
// rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
// int32_t connId = htonl(pHBMsg->connId);
// SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
// if (pConn == NULL) {
// pHBMsg->pid = htonl(pHBMsg->pid);
// pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
// }
// if (pConn == NULL) {
// // do not close existing links, otherwise
// // mError("failed to create connId, close connect");
// // pRsp->killConnection = 1;
// } else {
// pRsp->connId = htonl(pConn->connId);
// mnodeSaveQueryStreamList(pConn, pHBMsg);
// if (pConn->killed != 0) {
// pRsp->killConnection = 1;
// }
// if (pConn->streamId != 0) {
// pRsp->streamId = htonl(pConn->streamId);
// pConn->streamId = 0;
// }
// if (pConn->queryId != 0) {
// pRsp->queryId = htonl(pConn->queryId);
// pConn->queryId = 0;
// }
// }
// int32_t onlineDnodes = 0, totalDnodes = 0;
// mnodeGetOnlineAndTotalDnodesNum(&onlineDnodes, &totalDnodes);
// pRsp->onlineDnodes = htonl(onlineDnodes);
// pRsp->totalDnodes = htonl(totalDnodes);
// mnodeGetMnodeEpSetForShell(&pRsp->epSet, false);
// pMsg->rpcRsp.rsp = pRsp;
// pMsg->rpcRsp.len = sizeof(SHeartBeatRsp);
// mnodeReleaseConn(pConn);
// return TSDB_CODE_SUCCESS;
// }
// static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
// SConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont;
// SConnectRsp *pConnectRsp = NULL;
// int32_t code = TSDB_CODE_SUCCESS;
// SRpcConnInfo connInfo = {0};
// if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) != 0) {
// mError("thandle:%p is already released while process connect msg", pMsg->rpcMsg.handle);
// code = TSDB_CODE_MND_INVALID_CONNECTION;
// goto connect_over;
// }
// code = taosCheckVersion(pConnectMsg->clientVersion, version, 3);
// if (code != TSDB_CODE_SUCCESS) {
// goto connect_over;
// }
// SUserObj *pUser = pMsg->pUser;
// SAcctObj *pAcct = pUser->pAcct;
// if (pConnectMsg->db[0]) {
// char dbName[TSDB_TABLE_FNAME_LEN * 3] = {0};
// sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db);
// SDbObj *pDb = mnodeGetDb(dbName);
// if (pDb == NULL) {
// code = TSDB_CODE_MND_INVALID_DB;
// goto connect_over;
// }
// if (pDb->status != TSDB_DB_STATUS_READY) {
// mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
// code = TSDB_CODE_MND_DB_IN_DROPPING;
// mnodeDecDbRef(pDb);
// goto connect_over;
// }
// mnodeDecDbRef(pDb);
// }
// pConnectRsp = rpcMallocCont(sizeof(SConnectRsp));
// if (pConnectRsp == NULL) {
// code = TSDB_CODE_MND_OUT_OF_MEMORY;
// goto connect_over;
// }
// pConnectMsg->pid = htonl(pConnectMsg->pid);
// SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName);
// if (pConn == NULL) {
// code = terrno;
// } else {
// pConnectRsp->connId = htonl(pConn->connId);
// mnodeReleaseConn(pConn);
// }
// sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
// memcpy(pConnectRsp->serverVersion, version, TSDB_VERSION_LEN);
// pConnectRsp->writeAuth = pUser->writeAuth;
// pConnectRsp->superAuth = pUser->superAuth;
// mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet, false);
// dnodeGetClusterId(pConnectRsp->clusterId);
// connect_over:
// if (code != TSDB_CODE_SUCCESS) {
// if (pConnectRsp) rpcFreeCont(pConnectRsp);
// mLError("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code));
// } else {
// mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code));
// pMsg->rpcRsp.rsp = pConnectRsp;
// pMsg->rpcRsp.len = sizeof(SConnectRsp);
// }
// return code;
// }
...@@ -18,4 +18,22 @@ ...@@ -18,4 +18,22 @@
#include "mndInt.h" #include "mndInt.h"
int32_t mndInitDb(SMnode *pMnode) { return 0; } int32_t mndInitDb(SMnode *pMnode) { return 0; }
void mndCleanupDb(SMnode *pMnode) {} void mndCleanupDb(SMnode *pMnode) {}
\ No newline at end of file
// static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
// SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont;
// int32_t code = TSDB_CODE_SUCCESS;
// if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pUseDbMsg->db);
// if (pMsg->pDb == NULL) {
// return TSDB_CODE_MND_INVALID_DB;
// }
// if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
// mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
// return TSDB_CODE_MND_DB_IN_DROPPING;
// }
// return code;
// }
...@@ -14,8 +14,314 @@ ...@@ -14,8 +14,314 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "mndShow.h"
#include "mndInt.h"
int32_t mndInitShow(SMnode *pMnode) { return 0; } static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg);
void mndCleanupShow(SMnode *pMnode) {} static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMsg);
\ No newline at end of file static bool mndCheckRetrieveFinished(SShowObj *pShow);
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow);
static void mndFreeShowObj(void *ppShow);
static char *mndShowStr(int32_t showType);
int32_t mndInitShow(SMnode *pMnode) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->cache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mndFreeShowObj, "show");
if (pMgmt->cache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc show cache since %s", terrstr());
return -1;
}
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW, mndProcessShowMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_SHOW_RETRIEVE, mndProcessRetrieveMsg);
return 0;
}
void mndCleanupShow(SMnode *pMnode) {}
static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont;
int8_t type = pMsg->type;
uint16_t payloadLen = htonl(pMsg->payloadLen);
if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
mError("failed to process show msg since %s", terrstr());
return -1;
}
ShowMetaFp metaFp = pMgmt->metaFps[type];
if (metaFp == NULL) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
mError("failed to process show-meta msg:%s since no message handle", mndShowStr(type));
return -1;
}
int32_t size = sizeof(SShowObj) + payloadLen;
SShowObj *pShow = calloc(1, size);
if (pShow != NULL) {
pShow->pMnode = pMnode;
pShow->type = type;
pShow->payloadLen = payloadLen;
memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN);
memcpy(pShow->payload, pMsg->payload, payloadLen);
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
return -1;
}
if (mndPutShowObj(pMnode, pShow) == 0) {
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
free(pShow);
return -1;
}
size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SShowRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj(pShow, true);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to process show-meta msg:%s since malloc rsp error", pShow->id, mndShowStr(type));
return -1;
}
pRsp->qhandle = htobe64((uint64_t)pShow);
int32_t code = (*metaFp)(pMnode, &pRsp->tableMeta, pShow, pMnodeMsg->rpcMsg.handle);
mDebug("show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s", pShow->id, mndShowStr(type),
pShow->numOfRows, pShow->numOfColumns, tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
pMnodeMsg->pCont = pRsp;
mndReleaseShowObj(pShow, false);
return TSDB_CODE_SUCCESS;
} else {
rpcFreeCont(pRsp);
mndReleaseShowObj(pShow, true);
return code;
}
}
static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t rowsToRead = 0;
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveTableMsg *pRetrieve = pMnodeMsg->rpcMsg.pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
/*
* in case of server restart, apps may hold qhandle created by server before
* restart, which is actually invalid, therefore, signature check is required.
*/
if (mndAcquireShowObj(pMnode, pShow) != 0) {
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr());
return -1;
}
ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
if (retrieveFp == NULL) {
mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr());
return -1;
}
mDebug("show:%d, type:%s, start retrieve data, numOfReads:%d numOfRows:%d", pShow->id, mndShowStr(pShow->type),
pShow->numOfReads, pShow->numOfRows);
if (mndCheckRetrieveFinished(pShow)) {
mDebug("show:%d, read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads, pShow->numOfRows);
pShow->numOfReads = pShow->numOfRows;
}
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsToRead = pShow->numOfRows - pShow->numOfReads;
}
/* return no more than 100 tables in one round trip */
if (rowsToRead > 100) rowsToRead = 100;
/*
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
* issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
*/
if (rowsToRead < 0) rowsToRead = 0;
size = pShow->rowSize * rowsToRead;
size += 100;
SRetrieveTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj(pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr());
return -1;
}
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsRead = (*retrieveFp)(pMnode, pShow, pRsp->data, rowsToRead, pMnodeMsg->rpcMsg.handle);
}
mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = (int16_t)htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision
pMnodeMsg->pCont = pRsp;
pMnodeMsg->contLen = size;
if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
pRsp->completed = 1;
mDebug("%p, retrieve completed", pShow);
mndReleaseShowObj(pShow, true);
} else {
mDebug("%p, retrieve not completed yet", pShow);
mndReleaseShowObj(pShow, false);
}
return TSDB_CODE_SUCCESS;
}
static char *mndShowStr(int32_t showType) {
switch (showType) {
case TSDB_MGMT_TABLE_ACCT:
return "show accounts";
case TSDB_MGMT_TABLE_USER:
return "show users";
case TSDB_MGMT_TABLE_DB:
return "show databases";
case TSDB_MGMT_TABLE_TABLE:
return "show tables";
case TSDB_MGMT_TABLE_DNODE:
return "show dnodes";
case TSDB_MGMT_TABLE_MNODE:
return "show mnodes";
case TSDB_MGMT_TABLE_VGROUP:
return "show vgroups";
case TSDB_MGMT_TABLE_METRIC:
return "show stables";
case TSDB_MGMT_TABLE_MODULE:
return "show modules";
case TSDB_MGMT_TABLE_QUERIES:
return "show queries";
case TSDB_MGMT_TABLE_STREAMS:
return "show streams";
case TSDB_MGMT_TABLE_VARIABLES:
return "show configs";
case TSDB_MGMT_TABLE_CONNS:
return "show connections";
case TSDB_MGMT_TABLE_SCORES:
return "show scores";
case TSDB_MGMT_TABLE_GRANTS:
return "show grants";
case TSDB_MGMT_TABLE_VNODES:
return "show vnodes";
case TSDB_MGMT_TABLE_CLUSTER:
return "show clusters";
case TSDB_MGMT_TABLE_STREAMTABLES:
return "show streamtables";
case TSDB_MGMT_TABLE_TP:
return "show topics";
default:
return "undefined";
}
}
static bool mndCheckRetrieveFinished(SShowObj *pShow) {
if (pShow->pIter == NULL && pShow->numOfReads != 0) {
return true;
}
return false;
}
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow;
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE));
if (ppShow) {
mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow);
return 0;
}
return -1;
}
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowObj **ppShow = (SShowObj **)pShow->ppShow;
taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove);
mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove);
}
static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000;
TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow;
pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1);
SShowObj **ppShow =
taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan);
if (ppShow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:%d, failed to put into cache", pShow->id);
return -1;
}
mTrace("show:%d, data:%p put into cache", pShow->id, ppShow);
return 0;
}
static void mndFreeShowObj(void *ppShow) {
SShowObj *pShow = *(SShowObj **)ppShow;
SMnode *pMnode = pShow->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
if (freeFp != NULL) {
if (pShow->pVgIter != NULL) {
// only used in 'show vnodes "ep"'
(*freeFp)(pMnode, pShow->pVgIter);
}
if (pShow->pIter != NULL) {
(*freeFp)(pMnode, pShow->pIter);
}
}
mDebug("show:%d, data:%p destroyed", pShow->id, ppShow);
tfree(pShow);
}
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->metaFps[showType] = fp;
}
void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->retrieveFps[showType] = fp;
}
void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->freeIterFps[showType] = fp;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册