提交 bcbc8618 编写于 作者: H Haojun Liao

ehn(query): refactor show module in mnode.

上级 60c1ce2d
......@@ -203,7 +203,7 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
......@@ -211,10 +211,6 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks);
static FORCE_INLINE int32_t blockEstimateEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + (int32_t)ceil(blockDataGetSerialRowSize(pBlock) * pBlock->info.rows);
}
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
int8_t compressed) {
int32_t colSize = colDataGetLength(pColRes, numOfRows);
......
......@@ -953,9 +953,14 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
char tb[TSDB_TABLE_NAME_LEN];
int64_t showId;
// int8_t free;
} SRetrieveTableReq;
typedef struct SSysTableSchema {
int8_t type;
col_id_t colId;
int32_t bytes;
} SSysTableSchema;
int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
......
......@@ -24,7 +24,6 @@ extern "C" {
#include "types.h"
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT;
typedef uint32_t TDRowLenT;
typedef uint8_t TDRowValT;
typedef uint64_t TDRowVerT;
......
......@@ -388,10 +388,11 @@ typedef struct {
int32_t payloadLen;
void* pIter;
SMnode* pMnode;
STableMetaRsp* pMeta;
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
char payload[];
} SShowObj;
typedef struct {
......
......@@ -41,8 +41,7 @@ extern "C" {
typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowMetaFp)(SNodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct SMnodeLoad {
......@@ -98,7 +97,7 @@ typedef struct {
int64_t timeseriesAllowed;
} SGrantInfo;
typedef struct SMnode {
struct SMnode {
int32_t selfId;
int64_t clusterId;
int8_t replica;
......@@ -122,7 +121,7 @@ typedef struct SMnode {
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb;
} SMnode;
};
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
int64_t mndGenerateUid(char *name, int32_t len);
......
......@@ -33,8 +33,7 @@ static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq);
static int32_t mndProcessCreateBnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp);
static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter);
int32_t mndInitBnode(SMnode *pMnode) {
......@@ -438,38 +437,34 @@ static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SBnodeObj *pObj = NULL;
char *pWrite;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_BNODE, pShow->pIter, (void **)&pObj);
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pObj->id, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pObj->id;
cols++;
char buf[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->pDnode->ep, pShow->bytes[cols]);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, buf, false);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pObj->createdTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pObj->createdTime, false);
numOfRows++;
sdbRelease(pSdb, pObj);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
......
......@@ -26,7 +26,7 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster);
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
int32_t mndInitCluster(SMnode *pMnode) {
......@@ -178,12 +178,11 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pMsg->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
char *pWrite;
SClusterObj *pCluster = NULL;
while (numOfRows < rows) {
......@@ -191,24 +190,22 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, char *data,
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->id, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pCluster->id;
cols++;
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->bytes[cols]);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pCluster->name, TSDB_CLUSTER_ID_LEN);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, buf, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pCluster->createdTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->createdTime, false);
sdbRelease(pSdb, pCluster);
numOfRows++;
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......
......@@ -35,7 +35,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessConsumerMetaMsg(SNodeMsg *pMsg);
static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
int32_t mndInitConsumer(SMnode *pMnode) {
......
......@@ -38,7 +38,7 @@ static int32_t mndProcessDropDbReq(SNodeMsg *pReq);
static int32_t mndProcessUseDbReq(SNodeMsg *pReq);
static int32_t mndProcessSyncDbReq(SNodeMsg *pReq);
static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rowsCapacity);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq);
......@@ -1351,90 +1351,76 @@ char *mnGetDbStr(char *src) {
return pos;
}
static char *getDataPosition(char *pData, SShowObj *pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) {
return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows;
}
static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_t rows, int32_t rowCapacity,
int64_t numOfTables) {
static void dumpDbInfoData(SSDataBlock* pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables) {
int32_t cols = 0;
char *pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char* buf = taosMemoryMalloc(pShow->bytes[cols]);
char *name = mnGetDbStr(pDb->name);
if (name != NULL) {
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(buf, name, pShow->bytes[cols]);
} else {
STR_TO_VARSTR(pWrite, "NULL");
// STR_TO_VARSTR(pWrite, "NULL");
ASSERT(0);
}
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int64_t *)pWrite = pDb->createdTime;
cols++;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, buf, false);
taosMemoryFree(buf);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->createdTime, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int64_t *)pWrite = numOfTables;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfVgroups, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int16_t *)pWrite = pDb->cfg.replications;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&numOfTables, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int16_t *)pWrite = pDb->cfg.quorum;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.daysPerFile;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.quorum, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.daysPerFile, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char tmp[128] = {0};
int32_t len = 0;
if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
len = sprintf(&tmp[2], "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0);
} else {
sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
len = sprintf(&tmp[2], "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2);
}
STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp));
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.cacheBlockSize;
cols++;
varDataSetLen(tmp, len);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)tmp, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheBlockSize, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.totalBlocks;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.totalBlocks, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.minRows;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.minRows, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.maxRows;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.maxRows, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.walLevel;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.walLevel, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.fsyncPeriod;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.fsyncPeriod, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.compression;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char *prec = NULL;
switch (pDb->cfg.precision) {
case TSDB_TIME_PRECISION_MILLI:
......@@ -1450,28 +1436,31 @@ static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_
prec = "none";
break;
}
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++;
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int32_t *)pWrite = pDb->cfg.ttl;
cols++;
char t[10] = {0};
STR_WITH_SIZE_TO_VARSTR(t, prec, 2);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)t, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.singleSTable;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.ttl, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
*(int8_t *)pWrite = pDb->cfg.streamMode;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.singleSTable, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.streamMode, false);
pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
char *status = "ready";
STR_WITH_SIZE_TO_VARSTR(pWrite, status, strlen(status));
cols++;
char b[24] = {0};
STR_WITH_SIZE_TO_VARSTR(b, status, strlen(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)b, false);
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
}
static void setInformationSchemaDbCfg(SDbObj *pDbObj) {
......@@ -1506,12 +1495,28 @@ static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, v
return true;
}
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) {
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SDbObj *pDb = NULL;
// Append the information_schema database into the result.
if (!pShow->sysDbRsp) {
SDbObj infoschemaDb = {0};
setInformationSchemaDbCfg(&infoschemaDb);
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, 14);
numOfRows += 1;
SDbObj perfschemaDb = {0};
setPerfSchemaDbCfg(&perfschemaDb);
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, 3);
numOfRows += 1;
pShow->sysDbRsp = true;
}
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb);
if (pShow->pIter == NULL) {
......@@ -1521,25 +1526,11 @@ static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
int32_t numOfTables = 0;
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL);
dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity, numOfTables);
dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables);
numOfRows++;
sdbRelease(pSdb, pDb);
}
// Append the information_schema database into the result.
if (numOfRows + 2 < rowsCapacity) {
SDbObj infoschemaDb = {0};
setInformationSchemaDbCfg(&infoschemaDb);
dumpDbInfoToPayload(data, &infoschemaDb, pShow, numOfRows, rowsCapacity, 14);
numOfRows += 1;
SDbObj perfschemaDb = {0};
setPerfSchemaDbCfg(&perfschemaDb);
dumpDbInfoToPayload(data, &perfschemaDb, pShow, numOfRows, rowsCapacity, 14);
numOfRows += 1;
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
......
......@@ -56,9 +56,9 @@ static int32_t mndProcessConfigDnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessStatusReq(SNodeMsg *pReq);
static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
int32_t mndInitDnode(SMnode *pMnode) {
......@@ -669,7 +669,7 @@ static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
return 0;
}
static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
int32_t totalRows = 0;
int32_t numOfRows = 0;
......@@ -694,34 +694,36 @@ static int32_t mndRetrieveConfigs(SNodeMsg *pReq, SShowObj *pShow, char *data, i
snprintf(cfgVals[totalRows], TSDB_CONIIG_VALUE_LEN, "%s", tsCharset);
totalRows++;
char buf[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
char bufVal[TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
for (int32_t i = 0; i < totalRows; i++) {
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
cols++;
STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) buf, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, cfgVals[i], TSDB_CONIIG_VALUE_LEN);
cols++;
STR_WITH_MAXSIZE_TO_VARSTR(bufVal, cfgVals[i], TSDB_CONIIG_VALUE_LEN);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) bufVal, false);
numOfRows++;
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SDnodeObj *pDnode = NULL;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) {
......@@ -731,39 +733,41 @@ static int32_t mndRetrieveDnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, in
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->id;
cols++;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pDnode->id, false);
char buf[tListLen(pDnode->ep) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, buf, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int16_t id = mndGetVnodesNum(pMnode, pDnode->id);
colDataAppend(pColInfo, numOfRows, (const char*) &id, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->ep, pShow->bytes[cols]);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->numOfSupportVnodes, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = mndGetVnodesNum(pMnode, pDnode->id);
cols++;
char b1[9] = {0};
STR_TO_VARSTR(b1, online? "ready":"offline");
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->numOfSupportVnodes;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, online ? "ready" : "offline");
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pDnode->createdTime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pDnode->createdTime;
cols++;
char b[tListLen(offlineReason) + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, online ? "" : offlineReason[pDnode->offlineReason]);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b, false);
numOfRows++;
sdbRelease(pSdb, pDnode);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
......
......@@ -34,8 +34,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SNodeMsg *pReq, SFuncObj *pFunc);
static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq);
static int32_t mndProcessDropFuncReq(SNodeMsg *pReq);
static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq);
static int32_t mndGetFuncMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
int32_t mndInitFunc(SMnode *pMnode) {
......@@ -462,70 +461,6 @@ RETRIEVE_FUNC_OVER:
return code;
}
static int32_t mndGetFuncMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "comment");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "aggregate");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "outputtype");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "code_len");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "bufsize");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t len) {
char *msg = "unknown";
if (type >= sizeof(tDataTypes) / sizeof(tDataTypes[0])) {
......@@ -544,13 +479,12 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le
return tDataTypes[type].name;
}
static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SFuncObj *pFunc = NULL;
int32_t cols = 0;
char *pWrite;
char buf[TSDB_TYPE_STR_MAX_LEN];
while (numOfRows < rows) {
......@@ -559,40 +493,42 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, char *data, int
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->name, pShow->bytes[cols]);
cols++;
char b1[tListLen(pFunc->name) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b1, pFunc->name, pShow->bytes[cols]);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) b1, false);
char* b2 = taosMemoryCalloc(1, pShow->bytes[cols]);
STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) b2, false);
int32_t isAgg = (pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE) ? 1 : 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pFunc->pComment, pShow->bytes[cols]);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &isAgg, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE ? 1 : 0;
cols++;
char b3[TSDB_TYPE_STR_MAX_LEN] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b3, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen), pShow->bytes[cols]);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen),
pShow->bytes[cols]);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) b3, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pFunc->createdTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pFunc->createdTime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pFunc->codeSize;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pFunc->codeSize, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pFunc->bufSize;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pFunc->bufSize, false);
numOfRows++;
sdbRelease(pSdb, pFunc);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......
......@@ -35,7 +35,7 @@ static int32_t mndProcessDropMnodeReq(SNodeMsg *pReq);
static int32_t mndProcessCreateMnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessAlterMnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
int32_t mndInitMnode(SMnode *pMnode) {
......@@ -615,7 +615,7 @@ static int32_t mndProcessDropMnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
......@@ -628,34 +628,32 @@ static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, in
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pObj->id, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pObj->id;
cols++;
char b1[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b1, pObj->pDnode->ep, pShow->bytes[cols]);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
const char *roles = mndGetRoleStr(pObj->role);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, roles, pShow->bytes[cols]);
cols++;
char* b2 = taosMemoryCalloc(1, strlen(roles) + VARSTR_HEADER_SIZE);
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) b2, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pObj->roleTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->roleTime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pObj->createdTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
numOfRows++;
sdbRelease(pSdb, pObj);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
......
......@@ -77,9 +77,9 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
return 0;
......@@ -903,7 +903,6 @@ static int32_t mndRetrieveQueries(SNodeMsg *pReq, SShowObj *pShow, char *data, i
}
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......
......@@ -34,7 +34,7 @@ static int32_t mndProcessCreateQnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessQnodeListReq(SNodeMsg *pReq);
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
int32_t mndInitQnode(SMnode *pMnode) {
......@@ -497,7 +497,7 @@ static int32_t mndProcessDropQnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
......@@ -510,25 +510,21 @@ static int32_t mndRetrieveQnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, in
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*)&pObj->id, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pObj->id;
cols++;
char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)ep, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pObj->createdTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
numOfRows++;
sdbRelease(pSdb, pObj);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
......
......@@ -60,7 +60,6 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq) {
showObj.type = pReq->type;
showObj.payloadLen = pReq->payloadLen;
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
memcpy(showObj.payload, pReq->payload, pReq->payloadLen);
int32_t keepTime = tsShellActivityTimer * 6 * 1000;
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
......@@ -140,13 +139,14 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
return -1;
}
STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));
pShow->pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));
pShow->numOfColumns = pShow->pMeta->numOfColumns;
int32_t offset = 0;
for(int32_t i = 0; i < meta->numOfColumns; ++i) {
pShow->numOfColumns = meta->numOfColumns;
for(int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
pShow->offset[i] = offset;
int32_t bytes = meta->pSchemas[i].bytes;
int32_t bytes = pShow->pMeta->pSchemas[i].bytes;
pShow->rowSize += bytes;
pShow->bytes[i] = bytes;
offset += bytes;
......@@ -170,45 +170,83 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%s", pShow->id, mndShowStr(pShow->type));
/*
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
* issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
*/
size = pShow->rowSize * rowsToRead;
int32_t numOfCols = pShow->pMeta->numOfColumns;
size += SHOW_STEP_SIZE;
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj((SShowObj*) pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
return -1;
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
pRsp->handle = htobe64(pShow->id);
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0};
SSchema* p = &pShow->pMeta->pSchemas[i];
idata.info.bytes = p->bytes;
idata.info.type = p->type;
idata.info.colId = p->colId;
taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(p->type)) {
pBlock->info.hasVarCol = true;
}
}
blockDataEnsureCapacity(pBlock, rowsToRead);
if (mndCheckRetrieveFinished((SShowObj*) pShow)) {
mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
rowsRead = 0;
} else {
// if free flag is set, client wants to clean the resources
rowsRead = (*retrieveFp)(pReq, (SShowObj *)pShow, pRsp->data, rowsToRead);
rowsRead = (*retrieveFp)(pReq, (SShowObj *)pShow, pBlock, rowsToRead);
if (rowsRead < 0) {
terrno = rowsRead;
rpcFreeCont(pRsp);
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj((SShowObj *)pShow, true);
return -1;
}
pBlock->info.rows = rowsRead;
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
}
// numOfCols + sizeof(SSysTableSchema) * numOfCols + data payload
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns + blockDataGetSize(pBlock)
+ blockDataGetSerialMetaSize(pBlock);
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
mndReleaseShowObj((SShowObj*) pShow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, terrstr());
blockDataDestroy(pBlock);
return -1;
}
pRsp->handle = htobe64(pShow->id);
// if free flag is set, client wants to clean the resources
if (rowsRead > 0) {
char * pStart = pRsp->data;
SSchema *ps = pShow->pMeta->pSchemas;
*(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
pStart += sizeof(int32_t); // number of columns
for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
pSchema->bytes = htonl(ps[i].bytes);
pSchema->colId = htons(ps[i].colId);
pSchema->type = ps[i].type;
pStart += sizeof(SSysTableSchema);
}
int32_t len = 0;
blockCompressEncode(pBlock, pStart, &len, pShow->pMeta->numOfColumns, false);
}
pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
pReq->pRsp = pRsp;
pReq->rspLen = size;
pReq->pRsp = pRsp;
pReq->rspLen = size;
if (rowsRead == 0 || rowsRead < rowsToRead) {
pRsp->completed = 1;
......@@ -219,6 +257,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
mndReleaseShowObj((SShowObj*) pShow, false);
}
blockDataDestroy(pBlock);
return TSDB_CODE_SUCCESS;
}
......
......@@ -40,8 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq);
static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq);
static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp);
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp);
static int32_t mndGetSmaMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
int32_t mndInitSma(SMnode *pMnode) {
......@@ -765,14 +764,12 @@ static int32_t mndGetSmaMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe
return 0;
}
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SSmaObj *pSma = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[TSDB_DB_FNAME_LEN] = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
......@@ -790,19 +787,25 @@ static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
SName smaName = {0};
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, (char *)tNameGetTableName(&smaName));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pSma->createdTime;
char n[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(n, (char *)tNameGetTableName(&smaName));
cols++;
SName stbName = {0};
tNameFromString(&stbName, pSma->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, (char *)tNameGetTableName(&stbName));
cols++;
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) n, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n1, false);
numOfRows++;
sdbRelease(pSdb, pSma);
......@@ -810,7 +813,6 @@ static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
......
......@@ -33,7 +33,7 @@ static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq);
static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
int32_t mndInitSnode(SMnode *pMnode) {
......@@ -447,38 +447,34 @@ static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SSnodeObj *pObj = NULL;
char *pWrite;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_SNODE, pShow->pIter, (void **)&pObj);
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*)&pObj->id, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pObj->id;
cols++;
char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->bytes[cols]);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pObj->pDnode->ep, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)ep, false);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pObj->createdTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
numOfRows++;
sdbRelease(pSdb, pObj);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
......
......@@ -39,7 +39,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessVAlterStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessVDropStbRsp(SNodeMsg *pRsp);
static int32_t mndProcessTableMetaReq(SNodeMsg *pReq);
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
int32_t mndInitStb(SMnode *pMnode) {
......@@ -1638,13 +1638,12 @@ static void mndExtractTableName(char *tableId, char *name) {
}
}
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStbObj *pStb = NULL;
int32_t cols = 0;
char *pWrite;
SDbObj* pDb = NULL;
if (strlen(pShow->db) > 0) {
......@@ -1662,44 +1661,46 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
}
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN] = {0};
mndExtractTableName(pStb->name, stbName);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stbName);
cols++;
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTableName(pStb->name, &stbName[VARSTR_HEADER_SIZE]);
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
char db[TSDB_DB_NAME_LEN] = {0};
colDataAppend(pColInfo, numOfRows, (const char*) stbName, false);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB);
tNameGetDbName(&name, db);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, db);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStb->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pStb->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pStb->numOfTags;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStb->updateTime; // number of tables
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pStb->commentLen != 0) {
STR_TO_VARSTR(pWrite, pStb->comment);
} else {
STR_TO_VARSTR(pWrite, "");
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
colDataAppend(pColInfo, numOfRows, (const char*) db, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->createdTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfColumns, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->numOfTags, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables
char* p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures
if (p != NULL) {
if (pStb->commentLen != 0) {
STR_TO_VARSTR(p, pStb->comment);
} else {
STR_TO_VARSTR(p, "");
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)p, false);
taosMemoryFree(p);
}
cols++;
numOfRows++;
sdbRelease(pSdb, pStb);
......@@ -1710,7 +1711,6 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
}
pShow->numOfRows += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
......
......@@ -58,7 +58,7 @@ int32_t mndInitStream(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextStream);
return sdbSetTable(pMnode->pSdb, table);
......@@ -516,7 +516,6 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, in
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
......
......@@ -35,7 +35,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndProcessTopicMetaReq(SNodeMsg *pReq);
static int32_t mndGetTopicMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
......@@ -53,7 +52,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);
return sdbSetTable(pMnode->pSdb, table);
......@@ -596,7 +595,6 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows;
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
return numOfRows;
}
......
......@@ -74,7 +74,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -1370,7 +1370,6 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, char *data, int
sdbRelease(pSdb, pTrans);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......
......@@ -35,8 +35,7 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq);
static int32_t mndProcessAlterUserReq(SNodeMsg *pReq);
static int32_t mndProcessDropUserReq(SNodeMsg *pReq);
static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq);
static int32_t mndGetUserMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
int32_t mndInitUser(SMnode *pMnode) {
......@@ -640,53 +639,7 @@ GET_AUTH_OVER:
return code;
}
static int32_t mndGetUserMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "privilege");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "account");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
......@@ -700,33 +653,32 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, char *data, int
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->user, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superUser) {
const char *src = "super";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} else {
const char *src = "normal";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
}
cols++;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char name[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->user, pShow->bytes[cols]);
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
const char* src = pUser->superUser? "super":"normal";
char b[10+VARSTR_HEADER_SIZE] = {0};
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
colDataAppend(pColInfo, numOfRows, (const char*) b, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pUser->createdTime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pUser->createdTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->acct, pShow->bytes[cols]);
cols++;
STR_WITH_MAXSIZE_TO_VARSTR(name, pUser->acct, pShow->bytes[cols]);
colDataAppend(pColInfo, numOfRows, (const char*) name, false);
numOfRows++;
sdbRelease(pSdb, pUser);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......
......@@ -35,10 +35,9 @@ static int32_t mndProcessDropVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessSyncVnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SNodeMsg *pRsp);
static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
int32_t mndInitVgroup(SMnode *pMnode) {
......@@ -499,58 +498,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return 0;
}
static int32_t mndGetVgroupMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetVgroupMaxReplica(pMnode, pShow->db, &pShow->replica, &pShow->numOfRows) != 0) {
return -1;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vgId");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tables");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
for (int32_t i = 0; i < pShow->replica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_status", i + 1);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
}
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
......@@ -575,44 +523,41 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, i
}
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->vgId;
cols++;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
SName name = {0};
char db[TSDB_DB_NAME_LEN] = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB);
tNameGetDbName(&name, db);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, db);
cols++;
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->numOfTables;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)db, false);
//status
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, "ready"); // TODO
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
//onlines
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->replica;
cols++;
// status
char buf[10] = {0};
STR_TO_VARSTR(buf, "ready"); // TODO
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, buf, false);
// onlines
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->replica, false);
for (int32_t i = 0; i < pVgroup->replica; ++i) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pVgroup->vnodeGid[i].dnodeId;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);
char buf1[20] = {0};
const char *role = mndGetRoleStr(pVgroup->vnodeGid[i].role);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]);
cols++;
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->bytes[cols]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)buf1, false);
}
numOfRows++;
......@@ -623,7 +568,6 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, char *data, i
mndReleaseDb(pMnode, pDb);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......@@ -653,12 +597,11 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return numOfVnodes;
}
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
char *pWrite;
int32_t cols = 0;
// int32_t dnodeId = pShow->replica;
......@@ -670,30 +613,29 @@ static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, in
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVgroup->vgId;
cols++;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vgId, false);
SName name = {0};
char db[TSDB_DB_NAME_LEN] = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT|T_NAME_DB);
tNameGetDbName(&name, db);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, db);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)db, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = 0; //todo: Tables
cols++;
uint32_t val = 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&val, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, mndGetRoleStr(pVgid->role));
cols++;
char buf[20] = {0};
STR_TO_VARSTR(buf, mndGetRoleStr(pVgid->role));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)buf, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVgroup->replica; //onlines
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->replica, false); // onlines
numOfRows++;
}
......@@ -701,7 +643,6 @@ static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, in
sdbRelease(pSdb, pVgroup);
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfRows += numOfRows;
return numOfRows;
}
......
......@@ -451,13 +451,13 @@ typedef struct SSysTableScanInfo {
int32_t accountId;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SNode *pCondition; // db_name filter condition, to discard data that are not in current database
void *pCur; // cursor for iterate the local table meta store.
SArray *scanCols; // SArray<int16_t> scan column id list
int32_t type; // show type, TODO remove it
SName name;
SSDataBlock* pRes;
SSDataBlock *pRes;
int32_t capacity;
int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo;
......
......@@ -4007,10 +4007,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
return TSDB_CODE_SUCCESS;
}
// TODO if only one or two columnss required, how to extract data?
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
uint64_t* total, SArray* pColList) {
// TODO if only one or two columns required, how to extract data?
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) {
blockDataEnsureCapacity(pRes, numOfRows);
if (pColList == NULL) { // data from other sources
......@@ -4040,18 +4039,70 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
}
} else { // extract data according to pColList
ASSERT(numOfOutput == taosArrayGetSize(pColList));
char* pStart = pData;
int32_t numOfCols = htonl(*(int32_t*)pStart);
pStart += sizeof(int32_t);
SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
for(int32_t i = 0; i < numOfCols; ++i) {
SSysTableSchema* p = (SSysTableSchema*)pStart;
p->colId = htons(p->colId);
p->bytes = htonl(p->bytes);
pStart += sizeof(SSysTableSchema);
}
SSDataBlock block = {.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)), .info.numOfCols = numOfCols};
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0};
idata.info.type = pSchema[i].type;
idata.info.bytes = pSchema[i].bytes;
idata.info.colId = pSchema[i].colId;
taosArrayPush(block.pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) {
block.info.hasVarCol = true;
}
}
blockDataEnsureCapacity(&block, numOfRows);
int32_t* colLen = (int32_t*) pStart;
pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] > 0);
SColumnInfoData* pColInfoData = taosArrayGet(block.pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colLen[i];
pColInfoData->varmeta.allocLen = colLen[i];
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
pStart += sizeof(int32_t) * numOfRows;
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
} else {
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
pStart += BitmapLen(numOfRows);
}
memcpy(pColInfoData->pData, pStart, colLen[i]);
pStart += colLen[i];
}
// data from mnode
for (int32_t i = 0; i < numOfOutput; ++i) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pSrc = taosArrayGet(block.pDataBlock, i);
for (int32_t j = 0; j < numOfOutput; ++j) {
int16_t colIndex = *(int16_t*)taosArrayGet(pColList, j);
if (colIndex - 1 == i) {
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, j);
for (int32_t k = 0; k < numOfRows; ++k) {
colDataAppend(pColInfoData, k, pData, false);
pData += pColInfoData->info.bytes;
}
colDataAssign(pColInfoData, pSrc, numOfRows);
break;
}
}
......
......@@ -756,12 +756,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
return NULL;
}
pInfo->accountId = accountId;
pInfo->accountId = accountId;
pInfo->showRewrite = showRewrite;
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
// TODO remove it
int32_t tableType = 0;
......@@ -833,15 +833,15 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
#endif
}
pOperator->name = "SysTableScanOperator";
pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->getNextFn = doSysTableScan;
pOperator->closeFn = destroySysScanOperator;
pOperator->pTaskInfo = pTaskInfo;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->getNextFn = doSysTableScan;
pOperator->closeFn = destroySysScanOperator;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册