diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e83b992f06c39250b5e5b48a37b60843c4504823..70d89102e67dd933a6228402f647a212d953f63a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -116,7 +116,7 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, void *pMemRef); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d399b5e3bf0001128dcdda812a875dca163d919b..70b6e24b07c4a990921517684d4d65ce5f677a34 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, void* pMemRef); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e7a40eeeb94f8b81984f0878d52c247be9c9c322..ce73246e51adcb3906b16a56084d44cf2fb0bed4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle return TSDB_CODE_SUCCESS; } -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { @@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi } if (emptyQueryTimewindow(pTsdbReadHandle)) { - return (tsdbReaderT*)pTsdbReadHandle; + return (tsdbReaderT)pTsdbReadHandle; } // todo apply the lastkey of table check to avoid to load header file diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 00643be0e191c787e3d30f2c37784942d6cbc14f..ae0f669d656a458a310339e461ad2e55918ee211 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -412,6 +412,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); @@ -426,27 +427,57 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_NEW(level + 1, "I/O: "); int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); - for (int32_t i = 0; i < nodeNum; ++i) { + struct STableScanAnalyzeInfo info = {0}; + + int32_t maxIndex = 0; + int32_t totalRows = 0; + for(int32_t i = 0; i < nodeNum; ++i) { SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i); STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo; - EXPLAIN_ROW_APPEND("total_blocks=%d", pScanInfo->totalBlocks); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + info.totalBlocks += pScanInfo->totalBlocks; + info.loadBlocks += pScanInfo->loadBlocks; + info.totalRows += pScanInfo->totalRows; + info.skipBlocks += pScanInfo->skipBlocks; + info.filterTime += pScanInfo->filterTime; + info.loadBlockStatis += pScanInfo->loadBlockStatis; + info.totalCheckedRows += pScanInfo->totalCheckedRows; + info.filterOutBlocks += pScanInfo->filterOutBlocks; + + if (pScanInfo->totalRows > totalRows) { + totalRows = pScanInfo->totalRows; + maxIndex = i; + } + } - EXPLAIN_ROW_APPEND("load_blocks=%d", pScanInfo->loadBlocks); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("total_blocks=%.1f", ((double)info.totalBlocks) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND("load_block_SMAs=%d", pScanInfo->loadBlockStatis); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("load_blocks=%.1f", ((double)info.loadBlocks) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND("total_rows=%" PRIu64, pScanInfo->totalRows); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("load_block_SMAs=%.1f", ((double)info.loadBlockStatis) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND("check_rows=%" PRIu64, pScanInfo->totalCheckedRows); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - } + EXPLAIN_ROW_APPEND("total_rows=%.1f", ((double)info.totalRows) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND("check_rows=%.1f", ((double)info.totalCheckedRows) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_END(); + + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + //Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end, start offset by 1.470 ms. + SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex); + STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo; + + EXPLAIN_ROW_NEW(level + 1, " "); + EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows, "tbd", + execInfo->startupCost, execInfo->totalCost); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 756eb5f3758514d201b44c3d914bf1b46f5b871e..c33b6622e3a5b4e341517478a6a0195e81800434 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo { typedef struct SBlockDistInfo { SSDataBlock* pResBlock; void* pHandle; + SReadHandle readHandle; + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this @@ -740,7 +742,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a15cc94813c927a3e14b50860531382f73441481..5c038ed7098d79f2de2e5d36b19724aa916f096c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2815,7 +2815,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan // todo add more information about exchange operation int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) { + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; @@ -2842,8 +2843,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; - SExprSupp* pSup = &pOperator->exprSupp; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; + SExprSupp* pSup = &pOperator->exprSupp; SOperatorInfo* downstream = pOperator->pDownstream[0]; int64_t st = taosGetTimestampUs(); @@ -4095,6 +4095,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { + SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode; + pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); + + if (pBlockNode->tableType == TSDB_SUPER_TABLE) { + int32_t code = tsdbGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = terrno; + return NULL; + } + } else { // Create one table group. + STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid}; + taosArrayPush(pTableListInfo->pTableList, &info); + } + + SQueryTableDataCond cond = {0}; + + { + cond.order = TSDB_ORDER_ASC; + cond.numOfCols = 1; + cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (cond.colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + cond.colList->colId = 1; + cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP; + cond.colList->bytes = sizeof(TSKEY); + + cond.numOfTWindows = 1; + cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow)); + cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + cond.suid = pBlockNode->suid; + cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; + } + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + cleanupQueryTableDataCond(&cond); + + return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); } else { ASSERT(0); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c5e3f1a87c63a1c95e66c3af053a21e604d7c0b8..c47e9dd4aee34289f975ae7d3d3d32a6536be52f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -367,13 +368,14 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSDataBlock* pBlock = pTableScanInfo->pResBlock; int64_t st = taosGetTimestampUs(); while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // process this data block based on the probabilities @@ -396,7 +398,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupId) { pBlock->info.groupId = *groupId; } @@ -589,44 +591,76 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pInfo->dataReader = pReadHandle; // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; } +static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) { + int32_t rowLen = 0; + + SMetaReader mr = {0}; + metaReaderInit(&mr, pMeta, 0); + metaGetTableEntryByUid(&mr, uid); + if (mr.me.type == TSDB_SUPER_TABLE) { + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + } else if (mr.me.type == TSDB_CHILD_TABLE) { + uint64_t suid = mr.me.ctbEntry.suid; + metaGetTableEntryByUid(&mr, suid); + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + } else if (mr.me.type == TSDB_NORMAL_TABLE) { + int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes; + } + } + + metaReaderClear(&mr); + return rowLen; +} + static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - STableScanInfo* pTableScanInfo = pOperator->info; + SBlockDistInfo* pBlockScanInfo = pOperator->info; - STableBlockDistInfo blockDistInfo = {0}; - blockDistInfo.maxRows = INT_MIN; - blockDistInfo.minRows = INT_MAX; + STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; + blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid); - tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); + tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); + blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - pBlock->info.rows = 1; + SSDataBlock* pBlock = pBlockScanInfo->pResBlock; - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); + blockDataEnsureCapacity(pBlock, 1); colDataAppend(pColInfo, 0, p, false); taosMemoryFree(p); + pBlock->info.rows = 1; + pOperator->status = OP_EXEC_DONE; return pBlock; } @@ -636,7 +670,8 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pDistInfo->pResBlock); } -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, + SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -644,21 +679,20 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* goto _error; } - pInfo->pHandle = dataReader; - - pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_VARCHAR; - infoData.info.bytes = 1024; - - taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); + pInfo->pHandle = dataReader; + pInfo->readHandle = *readHandle; + pInfo->uid = uid; + pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); - pOperator->name = "DataBlockInfoScanOperator"; - // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); + initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); + + pOperator->name = "DataBlockDistScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, @@ -1837,21 +1871,23 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); - pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = colList; - pInfo->pRes = createResDataBlock(pDescNode); - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; - pInfo->pFilterNode = pPhyNode->node.pConditions; + + initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + + pInfo->pTableList = pTableListInfo; + pInfo->pColMatchInfo = colList; + pInfo->pRes = createResDataBlock(pDescNode); + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; + pInfo->pFilterNode = pPhyNode->node.pConditions; pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfExprs; - pOperator->pTaskInfo = pTaskInfo; + + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c243c1c175ee0e3a909af3db4252127bf9aa404d..e691d562c6b4426015a7a7d10906d6c4b307cb5f 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx); int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); + +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t blockDistFunction(SqlFunctionCtx *pCtx); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 522ee09b3cac7e4865be72ab381ff6853bf1301d..cfad00f45898a60dc196edae522c67246d5afb69 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -419,7 +419,7 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } -int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { +int32_t topBotCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { int32_t code = nodesListMakeAppend(pParameters, pPartialRes); if (TSDB_CODE_SUCCESS == code) { code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1))); @@ -427,65 +427,6 @@ int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeL return TSDB_CODE_SUCCESS; } -static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { - int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); - - if (isPartial) { - if (2 != numOfParams) { - return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); - } - - uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; - if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - // param1 - SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); - if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - SValueNode* pValue = (SValueNode*)pParamNode1; - if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - if (pValue->datum.i < 1 || pValue->datum.i > 100) { - return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); - } - - pValue->notReserved = true; - - // set result type - pFunc->node.resType = - (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; - } else { - if (1 != numOfParams) { - return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); - } - - uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (TSDB_DATA_TYPE_BINARY != para1Type) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - // Do nothing. We can only access output of partial functions as input, - // so original input type cannot be obtained, resType will be set same - // as original function input type after merge function created. - } - return TSDB_CODE_SUCCESS; -} - -static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTopBotImpl(pFunc, pErrBuf, len, true); -} - -static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTopBotImpl(pFunc, pErrBuf, len, false); -} - static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1735,31 +1676,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = topFunction, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, - .pPartialFunc = "_top_partial", - .pMergeFunc = "_top_merge", - // .createMergeParaFuc = topCreateMergePara - }, - { - .name = "_top_partial", - .type = FUNCTION_TYPE_TOP_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotPartial, - .getEnvFunc = getTopBotFuncEnv, - .initFunc = topBotFunctionSetup, - .processFunc = topFunction, - .finalizeFunc = topBotPartialFinalize, - .combineFunc = topCombine, - }, - { - .name = "_top_merge", - .type = FUNCTION_TYPE_TOP_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotMerge, - .getEnvFunc = getTopBotMergeFuncEnv, - .initFunc = functionSetup, - .processFunc = topFunctionMerge, - .finalizeFunc = topBotMergeFinalize, - .combineFunc = topCombine, + .pPartialFunc = "top", + .pMergeFunc = "top", + .createMergeParaFuc = topBotCreateMergePara }, { .name = "bottom", @@ -1771,30 +1690,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = bottomFunction, .finalizeFunc = topBotFinalize, .combineFunc = bottomCombine, - .pPartialFunc = "_bottom_partial", - .pMergeFunc = "_bottom_merge" - }, - { - .name = "_bottom_partial", - .type = FUNCTION_TYPE_BOTTOM_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotPartial, - .getEnvFunc = getTopBotFuncEnv, - .initFunc = topBotFunctionSetup, - .processFunc = bottomFunction, - .finalizeFunc = topBotPartialFinalize, - .combineFunc = bottomCombine, - }, - { - .name = "_bottom_merge", - .type = FUNCTION_TYPE_BOTTOM_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotMerge, - .getEnvFunc = getTopBotMergeFuncEnv, - .initFunc = functionSetup, - .processFunc = bottomFunctionMerge, - .finalizeFunc = topBotMergeFinalize, - .combineFunc = bottomCombine, + .pPartialFunc = "bottom", + .pMergeFunc = "bottom", + .createMergeParaFuc = topBotCreateMergePara }, { .name = "spread", @@ -2524,7 +2422,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. .initFunc = functionSetup, .processFunc = NULL, - .finalizeFunc = NULL + .finalizeFunc = NULL, + .pPartialFunc = "_select_value", + .pMergeFunc = "_select_value" }, { .name = "_block_dist", @@ -2532,6 +2432,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBlockDistFunc, .getEnvFunc = getBlockDistFuncEnv, + .initFunc = blockDistSetup, .processFunc = blockDistFunction, .finalizeFunc = blockDistFinalize }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c72d7b5106eb2f4ad14be2b1ed35b4da56c148fb..01fcc61cf07f6e1e1e74a71dd0b605b9e661852a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -67,8 +67,7 @@ typedef struct STopBotResItem { typedef struct STopBotRes { int32_t maxSize; - int16_t type; // store the original input type, used in merge function - int32_t numOfItems; + int16_t type; STopBotResItem* pItems; } STopBotRes; @@ -2872,12 +2871,6 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - // intermediate result is binary and length contains VAR header size - pEnv->calcMemSize = pFunc->node.resType.bytes; - return true; -} - bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { if (!functionSetup(pCtx, pResInfo)) { return false; @@ -2952,50 +2945,6 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) { - for (int32_t i = 0; i < pInput->numOfItems; i++) { - addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery); - } -} - -int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); - - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); - STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data); - STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - pInfo->maxSize = pInputInfo->maxSize; - pInfo->type = pInputInfo->type; - topBotTransferInfo(pCtx, pInputInfo, true); - SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); - - return TSDB_CODE_SUCCESS; -} - -int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); - - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); - STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data); - STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - pInfo->maxSize = pInputInfo->maxSize; - pInfo->type = pInputInfo->type; - topBotTransferInfo(pCtx, pInputInfo, false); - SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); - - return TSDB_CODE_SUCCESS; -} - static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) { uint16_t type = *(uint16_t*)param; @@ -3044,8 +2993,6 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; - // accumulate number of items for each vgroup, this info is needed for merge - pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -3144,7 +3091,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS releaseBufPage(pCtx->pBuf, pPage); } -int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) { +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); @@ -3164,39 +3111,13 @@ int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMer colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } - if (!isMerge) { - setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); - } + setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); currentRow += 1; } return pEntryInfo->numOfRes; } -int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return topBotFinalizeImpl(pCtx, pBlock, false); } - -int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - return topBotFinalizeImpl(pCtx, pBlock, true); -} - -int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getTopBotInfoSize(pRes->maxSize); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - - memcpy(varDataVal(res), pRes, resultBytes); - varDataSetLen(res, resultBytes); - - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - - colDataAppend(pCol, pBlock->info.rows, res, false); - - taosMemoryFree(res); - return 1; -} - void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); @@ -3211,8 +3132,6 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, pItem->tuplePos.pageId = -1; replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); pEntryInfo->numOfRes++; - // accumulate number of items for each vgroup, this info is needed for merge - pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -5004,7 +4923,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pInfo->minRows = INT32_MAX; + return true; +} + int32_t blockDistFunction(SqlFunctionCtx* pCtx) { + const int32_t BLOCK_DIST_RESULT_ROWS = 24; + SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; @@ -5022,6 +4953,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->totalRows += p1.totalRows; pDistInfo->numOfFiles += p1.numOfFiles; + pDistInfo->defMinRows = p1.defMinRows; + pDistInfo->defMaxRows = p1.defMaxRows; + pDistInfo->rowSize = p1.rowSize; + pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks; + if (pDistInfo->minRows > p1.minRows) { pDistInfo->minRows = p1.minRows; } @@ -5033,7 +4969,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; } - pResInfo->numOfRes = 1; + pResInfo->numOfRes = BLOCK_DIST_RESULT_ROWS; // default output rows return TSDB_CODE_SUCCESS; } @@ -5045,7 +4981,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; + if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1; if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1; @@ -5076,7 +5012,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; + if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1; @@ -5098,32 +5034,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* pData = GET_ROWCELL_INTERBUF(pResInfo); + STableBlockDistInfo* pData = GET_ROWCELL_INTERBUF(pResInfo); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); int32_t row = 0; - - STableBlockDistInfo info = {0}; - tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info); - char st[256] = {0}; + double totalRawSize = pData->totalRows * pData->rowSize; int32_t len = - sprintf(st + VARSTR_HEADER_SIZE, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]", - info.numOfBlocks, info.totalSize / 1024.0, info.totalSize / (info.numOfBlocks * 1024.0), - info.totalSize / (info.totalRows * info.rowSize * 1.0)); + sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]", + pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks, + pData->totalSize * 100 / totalRawSize, '%'); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]", - info.totalRows, info.minRows, info.maxRows, info.totalRows / info.numOfBlocks, info.numOfInmemRows); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%"PRId64"] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%"PRId64"]", + pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, pData->totalRows / pData->numOfBlocks); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables, - info.numOfFiles, 0); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables, + pData->numOfFiles, 0); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); @@ -5135,40 +5068,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t maxVal = 0; int32_t minVal = INT32_MAX; - for (int32_t i = 0; i < sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); ++i) { - if (maxVal < info.blockRowsHisto[i]) { - maxVal = info.blockRowsHisto[i]; + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + if (maxVal < pData->blockRowsHisto[i]) { + maxVal = pData->blockRowsHisto[i]; } - if (minVal > info.blockRowsHisto[i]) { - minVal = info.blockRowsHisto[i]; + if (minVal > pData->blockRowsHisto[i]) { + minVal = pData->blockRowsHisto[i]; } } int32_t delta = maxVal - minVal; int32_t step = delta / 50; + if (step == 0) { + step = 1; + } + + int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]); + int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets; + + bool singleModel = false; + if (bucketRange == 0) { + singleModel = true; + step = 20; + bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets; + } - int32_t numOfBuckets = sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); - int32_t bucketRange = (info.maxRows - info.minRows) / numOfBuckets; + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1)); - for (int32_t i = 0; i < 20; ++i) { - len += sprintf(st + VARSTR_HEADER_SIZE, "%04d |", info.defMinRows + bucketRange * (i + 1)); + int32_t num = 0; + if (singleModel && pData->blockRowsHisto[i] > 0) { + num = 20; + } else { + num = (pData->blockRowsHisto[i] + step - 1) / step; + } - int32_t num = (info.blockRowsHisto[i] + step - 1) / step; for (int32_t j = 0; j < num; ++j) { int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|'); len += x; } - double v = info.blockRowsHisto[i] * 100.0 / info.numOfBlocks; - len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.3f%c)", info.blockRowsHisto[i], v, '%'); + double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks; + len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%'); printf("%s\n", st); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); } - return row; + return TSDB_CODE_SUCCESS; } typedef struct SDerivInfo { diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index e3218f972b0076e2d6ca3f7f2e303383658b24cb..60589784511dbe2eab5ab16906f5149e7e2bd5da 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -419,6 +419,24 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE); } +static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) { + SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; + strcpy(name.dbname, pStmt->dbName); + strcpy(name.tname, pStmt->tableName); + int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + if (TSDB_CODE_SUCCESS == code) { + code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); + } + return code; +} + static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { pCxt->pStmt = pStmt; switch (nodeType(pStmt)) { @@ -497,6 +515,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_DELETE_STMT: return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt); + case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: + return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt); default: break; }