From 659e429b87e02d51b5653624b4032b821eed04df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Jun 2022 12:54:46 +0800 Subject: [PATCH] feature(query): support show table block distribution. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 +- source/libs/executor/inc/executorimpl.h | 4 +- source/libs/executor/src/executorimpl.c | 50 ++++++++++++-- source/libs/executor/src/scanoperator.c | 69 +++++++++++++++----- source/libs/function/inc/builtinsimpl.h | 2 + source/libs/function/src/builtins.c | 1 + source/libs/function/src/builtinsimpl.c | 86 +++++++++++++++++-------- source/libs/parser/src/parAstParser.c | 17 +++++ 10 files changed, 184 insertions(+), 53 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3cfca66a39..745bd7c72c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -116,7 +116,7 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, void *pMemRef); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 553c6a40ab..e0e218dbf9 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -119,7 +119,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, void* pMemRef); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e7a40eeeb9..ce73246e51 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle return TSDB_CODE_SUCCESS; } -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { @@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi } if (emptyQueryTimewindow(pTsdbReadHandle)) { - return (tsdbReaderT*)pTsdbReadHandle; + return (tsdbReaderT)pTsdbReadHandle; } // todo apply the lastkey of table check to avoid to load header file diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 28241d778c..5a1c007d6e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo { typedef struct SBlockDistInfo { SSDataBlock* pResBlock; void* pHandle; + SReadHandle readHandle; + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this @@ -735,7 +737,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 421fd56e1f..b6477e6db1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -502,6 +502,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; + if (slotId >= taosArrayGetSize(pBlock->pDataBlock)) { + slotId = taosArrayGetSize(pBlock->pDataBlock) - 1; + } pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); pInput->totalRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows; @@ -2812,7 +2815,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan // todo add more information about exchange operation int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) { + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; @@ -2840,7 +2844,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SExprSupp* pSup = &pOperator->exprSupp; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; SOperatorInfo* downstream = pOperator->pDownstream[0]; int64_t st = taosGetTimestampUs(); @@ -4087,6 +4090,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { + SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode; + pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); + + if (pBlockNode->tableType == TSDB_SUPER_TABLE) { + int32_t code = tsdbGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = terrno; + return NULL; + } + } else { // Create one table group. + STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid}; + taosArrayPush(pTableListInfo->pTableList, &info); + } + + SQueryTableDataCond cond = {0}; + + { + cond.order = TSDB_ORDER_ASC; + cond.numOfCols = 1; + cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (cond.colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + cond.colList->colId = 1; + cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP; + cond.colList->bytes = sizeof(TSKEY); + + cond.numOfTWindows = 1; + cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow)); + cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + cond.suid = pBlockNode->suid; + cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; + } + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + cleanupQueryTableDataCond(&cond); + + return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pTaskInfo); } else { ASSERT(0); } @@ -4284,8 +4327,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { - int32_t code = - getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); + int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index dcb5e2cd2c..5b8eff79de 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -590,12 +591,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pInfo->dataReader = pReadHandle; // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; @@ -606,16 +607,48 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { return NULL; } - STableScanInfo* pTableScanInfo = pOperator->info; + SBlockDistInfo* pBlockScanInfo = pOperator->info; STableBlockDistInfo blockDistInfo = {0}; blockDistInfo.maxRows = INT_MIN; blockDistInfo.minRows = INT_MAX; - tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); + SMetaReader mr = {0}; + metaReaderInit(&mr, pBlockScanInfo->readHandle.meta, 0); + metaGetTableEntryByUid(&mr, pBlockScanInfo->uid); + if (mr.me.type == TSDB_SUPER_TABLE) { + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + int32_t rowLen = 0; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + blockDistInfo.rowSize = rowLen; + } else if (mr.me.type == TSDB_CHILD_TABLE) { + uint64_t suid = mr.me.ctbEntry.suid; + metaGetTableEntryByUid(&mr, suid); + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + + int32_t rowLen = 0; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + blockDistInfo.rowSize = rowLen; + + } else if (mr.me.type == TSDB_NORMAL_TABLE) { + int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols; + int32_t rowLen = 0; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes; + } + blockDistInfo.rowSize = rowLen; + } + + metaReaderClear(&mr); - SSDataBlock* pBlock = pTableScanInfo->pResBlock; + tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); + blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); + + SSDataBlock* pBlock = pBlockScanInfo->pResBlock; pBlock->info.rows = 1; SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -625,6 +658,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); + colInfoDataEnsureCapacity(pColInfo, 0, 1); colDataAppend(pColInfo, 0, p, false); taosMemoryFree(p); @@ -637,7 +671,7 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pDistInfo->pResBlock); } -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -645,7 +679,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* goto _error; } - pInfo->pHandle = dataReader; + pInfo->pHandle = dataReader; + pInfo->readHandle = *readHandle; + pInfo->uid = uid; pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -653,13 +689,14 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = 1024; + pInfo->pResBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); - pOperator->name = "DataBlockInfoScanOperator"; - // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->name = "DataBlockDistScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c243c1c175..e691d562c6 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx); int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); + +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t blockDistFunction(SqlFunctionCtx *pCtx); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d2985a33a8..62565cb209 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2523,6 +2523,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBlockDistFunc, .getEnvFunc = getBlockDistFuncEnv, + .initFunc = blockDistSetup, .processFunc = blockDistFunction, .finalizeFunc = blockDistFinalize }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 55900c7c7e..cb8cc4e88d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5008,7 +5008,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pInfo->minRows = INT32_MAX; + return true; +} + int32_t blockDistFunction(SqlFunctionCtx* pCtx) { + const int32_t BLOCK_DIST_RESULT_ROWS = 24; + SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; @@ -5026,6 +5038,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->totalRows += p1.totalRows; pDistInfo->numOfFiles += p1.numOfFiles; + pDistInfo->defMinRows = p1.defMinRows; + pDistInfo->defMaxRows = p1.defMaxRows; + pDistInfo->rowSize = p1.rowSize; + pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks; + if (pDistInfo->minRows > p1.minRows) { pDistInfo->minRows = p1.minRows; } @@ -5037,7 +5054,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; } - pResInfo->numOfRes = 1; + pResInfo->numOfRes = BLOCK_DIST_RESULT_ROWS; // default output rows return TSDB_CODE_SUCCESS; } @@ -5049,7 +5066,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; + if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1; if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1; @@ -5080,7 +5097,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; + if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1; @@ -5102,32 +5119,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* pData = GET_ROWCELL_INTERBUF(pResInfo); + STableBlockDistInfo* pData = GET_ROWCELL_INTERBUF(pResInfo); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); int32_t row = 0; - - STableBlockDistInfo info = {0}; - tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info); - char st[256] = {0}; + double totalRawSize = pData->totalRows * pData->rowSize; int32_t len = - sprintf(st + VARSTR_HEADER_SIZE, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]", - info.numOfBlocks, info.totalSize / 1024.0, info.totalSize / (info.numOfBlocks * 1024.0), - info.totalSize / (info.totalRows * info.rowSize * 1.0)); + sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]", + pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks, + pData->totalSize * 100 / totalRawSize, '%'); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]", - info.totalRows, info.minRows, info.maxRows, info.totalRows / info.numOfBlocks, info.numOfInmemRows); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%"PRId64"] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%"PRId64"]", + pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, pData->totalRows / pData->numOfBlocks); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables, - info.numOfFiles, 0); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables, + pData->numOfFiles, 0); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); @@ -5139,40 +5153,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t maxVal = 0; int32_t minVal = INT32_MAX; - for (int32_t i = 0; i < sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); ++i) { - if (maxVal < info.blockRowsHisto[i]) { - maxVal = info.blockRowsHisto[i]; + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + if (maxVal < pData->blockRowsHisto[i]) { + maxVal = pData->blockRowsHisto[i]; } - if (minVal > info.blockRowsHisto[i]) { - minVal = info.blockRowsHisto[i]; + if (minVal > pData->blockRowsHisto[i]) { + minVal = pData->blockRowsHisto[i]; } } int32_t delta = maxVal - minVal; int32_t step = delta / 50; + if (step == 0) { + step = 1; + } + + int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]); + int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets; - int32_t numOfBuckets = sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); - int32_t bucketRange = (info.maxRows - info.minRows) / numOfBuckets; + bool singleModel = false; + if (bucketRange == 0) { + singleModel = true; + step = 20; + bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets; + } - for (int32_t i = 0; i < 20; ++i) { - len += sprintf(st + VARSTR_HEADER_SIZE, "%04d |", info.defMinRows + bucketRange * (i + 1)); + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1)); + + int32_t num = 0; + if (singleModel && pData->blockRowsHisto[i] > 0) { + num = 20; + } else { + num = (pData->blockRowsHisto[i] + step - 1) / step; + } - int32_t num = (info.blockRowsHisto[i] + step - 1) / step; for (int32_t j = 0; j < num; ++j) { int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|'); len += x; } - double v = info.blockRowsHisto[i] * 100.0 / info.numOfBlocks; - len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.3f%c)", info.blockRowsHisto[i], v, '%'); + double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks; + len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%'); printf("%s\n", st); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); } - return row; + return TSDB_CODE_SUCCESS; } typedef struct SDerivInfo { diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index e3218f972b..8a05a84827 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -419,6 +419,21 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE); } +static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) { + SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; + strcpy(name.dbname, pStmt->dbName); + strcpy(name.tname, pStmt->tableName); + int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + return code; +} + static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { pCxt->pStmt = pStmt; switch (nodeType(pStmt)) { @@ -497,6 +512,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_DELETE_STMT: return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt); + case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: + return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt); default: break; } -- GitLab