diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4f02c559b1cc9adee4f54deb352517cd686bac4a..2e721c32369a3ca3abb4aed649d7feb73594d1fc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -421,19 +421,23 @@ typedef struct SSysTableScanInfo { SRetrieveTableReq req; SEpSet epSet; tsem_t ready; - - SReadHandle readHandle; - int32_t accountId; - bool showRewrite; - SNode* pCondition; // db_name filter condition, to discard data that are not in current database - SMTbCursor* pCur; // cursor for iterate the local table meta store. - SArray* scanCols; // SArray scan column id list - SName name; - SSDataBlock* pRes; - int64_t numOfBlocks; // extract basic running information. - SLoadRemoteDataInfo loadInfo; + SReadHandle readHandle; + int32_t accountId; + bool showRewrite; + SNode* pCondition; // db_name filter condition, to discard data that are not in current database + SMTbCursor* pCur; // cursor for iterate the local table meta store. + SArray* scanCols; // SArray scan column id list + SName name; + SSDataBlock* pRes; + int64_t numOfBlocks; // extract basic running information. + SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; +typedef struct SBlockDistInfo { + SSDataBlock* pResBlock; + void* pHandle; +} SBlockDistInfo; + typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; int32_t* rowCellInfoOffset; // offset value for each row result cell info diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b7768bb7463051dee11922cf23c196e9dcda4d90..2801c55189474b3ffba18ea60b52bb426d0bd2c6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -650,38 +650,43 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pTableScanInfo->pResBlock; pBlock->info.rows = 1; - pBlock->info.numOfCols = 1; - // SBufferWriter bw = tbufInitWriter(NULL, false); - // blockDistInfoToBinary(&blockDistInfo, &bw); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); - // int32_t len = (int32_t) tbufTell(&bw); - // pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); - // *(int32_t*) pColInfo->pData = len; - // memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); - // - // tbufCloseWriter(&bw); + int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); + char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); + varDataSetLen(p, len); + + colDataAppend(pColInfo, 0, p, false); + taosMemoryFree(p); pOperator->status = OP_EXEC_DONE; return pBlock; } +static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { + SBlockDistInfo* pDistInfo = (SBlockDistInfo*) param; + blockDataDestroy(pDistInfo->pResBlock); +} + SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); + SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } - pInfo->dataReader = dataReader; - // pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + pInfo->pHandle = dataReader; + + pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = 1024; - // taosArrayPush(pInfo->block.pDataBlock, &infoData); + + taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; @@ -690,8 +695,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, NULL, NULL, NULL, NULL); - + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); return pOperator; _error: diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 6cf4a47bb92e7c2c37d4543f515ffca37be7c287..75b22a2a037d98b2fe98de9b148d5245332d6ada 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1142,6 +1142,11 @@ static int32_t translateBlockDistFunc(SFunctionNode* pFunc, char* pErrBuf, int32 return TSDB_CODE_SUCCESS; } +static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(STableBlockDistInfo); + return true; +} + // clang-format off const SBuiltinFuncDefinition funcMgtBuiltins[] = { @@ -1877,7 +1882,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_BLOCK_DIST, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBlockDistFunc, - .getEnvFunc = NULL, + .getEnvFunc = getBlockDistFuncEnv, .processFunc = blockDistFunction, .finalizeFunc = blockDistFinalize } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index dda8a8a890d4d569d4f36c72be5f03a3effe5d28..b8d2fae909c1c96cce2216f5997741a8529989e2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4439,8 +4439,29 @@ int32_t blockDistFunction(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - char *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - memcpy(pInfo, pInputCol->pData, varDataTLen(pInputCol->pData)); + STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); + + STableBlockDistInfo p1 = {0}; + tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1); + + pDistInfo->numOfBlocks += p1.numOfBlocks; + pDistInfo->numOfTables += p1.numOfTables; + pDistInfo->numOfInmemRows += p1.numOfInmemRows; + pDistInfo->totalSize += p1.totalSize; + pDistInfo->totalRows += p1.totalRows; + pDistInfo->numOfFiles += p1.numOfFiles; + + if (pDistInfo->minRows > p1.minRows) { + pDistInfo->minRows = p1.minRows; + } + if (pDistInfo->maxRows < p1.maxRows) { + pDistInfo->maxRows = p1.maxRows; + } + + for(int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) { + pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; + } + pResInfo->numOfRes = 1; return TSDB_CODE_SUCCESS; }