From 85d4ee6031062f448f0eca6d56e64c97a9324c24 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Jun 2022 10:29:53 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 26 +++++++++++-------- source/libs/executor/src/scanoperator.c | 34 ++++++++++++++----------- source/libs/function/src/builtins.c | 7 ++++- source/libs/function/src/builtinsimpl.c | 25 ++++++++++++++++-- 4 files changed, 63 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4f02c559b1..2e721c3236 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -421,19 +421,23 @@ typedef struct SSysTableScanInfo { SRetrieveTableReq req; SEpSet epSet; tsem_t ready; - - SReadHandle readHandle; - int32_t accountId; - bool showRewrite; - SNode* pCondition; // db_name filter condition, to discard data that are not in current database - SMTbCursor* pCur; // cursor for iterate the local table meta store. - SArray* scanCols; // SArray scan column id list - SName name; - SSDataBlock* pRes; - int64_t numOfBlocks; // extract basic running information. - SLoadRemoteDataInfo loadInfo; + SReadHandle readHandle; + int32_t accountId; + bool showRewrite; + SNode* pCondition; // db_name filter condition, to discard data that are not in current database + SMTbCursor* pCur; // cursor for iterate the local table meta store. + SArray* scanCols; // SArray scan column id list + SName name; + SSDataBlock* pRes; + int64_t numOfBlocks; // extract basic running information. + SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; +typedef struct SBlockDistInfo { + SSDataBlock* pResBlock; + void* pHandle; +} SBlockDistInfo; + typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; int32_t* rowCellInfoOffset; // offset value for each row result cell info diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b7768bb746..2801c55189 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -650,38 +650,43 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pTableScanInfo->pResBlock; pBlock->info.rows = 1; - pBlock->info.numOfCols = 1; - // SBufferWriter bw = tbufInitWriter(NULL, false); - // blockDistInfoToBinary(&blockDistInfo, &bw); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); - // int32_t len = (int32_t) tbufTell(&bw); - // pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t)); - // *(int32_t*) pColInfo->pData = len; - // memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len); - // - // tbufCloseWriter(&bw); + int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); + char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); + varDataSetLen(p, len); + + colDataAppend(pColInfo, 0, p, false); + taosMemoryFree(p); pOperator->status = OP_EXEC_DONE; return pBlock; } +static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { + SBlockDistInfo* pDistInfo = (SBlockDistInfo*) param; + blockDataDestroy(pDistInfo->pResBlock); +} + SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { - STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); + SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } - pInfo->dataReader = dataReader; - // pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + pInfo->pHandle = dataReader; + + pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.bytes = 1024; - // taosArrayPush(pInfo->block.pDataBlock, &infoData); + + taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; @@ -690,8 +695,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, NULL, NULL, NULL, NULL); - + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); return pOperator; _error: diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 6cf4a47bb9..75b22a2a03 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1142,6 +1142,11 @@ static int32_t translateBlockDistFunc(SFunctionNode* pFunc, char* pErrBuf, int32 return TSDB_CODE_SUCCESS; } +static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(STableBlockDistInfo); + return true; +} + // clang-format off const SBuiltinFuncDefinition funcMgtBuiltins[] = { @@ -1877,7 +1882,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_BLOCK_DIST, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBlockDistFunc, - .getEnvFunc = NULL, + .getEnvFunc = getBlockDistFuncEnv, .processFunc = blockDistFunction, .finalizeFunc = blockDistFinalize } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index dda8a8a890..b8d2fae909 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -4439,8 +4439,29 @@ int32_t blockDistFunction(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - char *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - memcpy(pInfo, pInputCol->pData, varDataTLen(pInputCol->pData)); + STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); + + STableBlockDistInfo p1 = {0}; + tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1); + + pDistInfo->numOfBlocks += p1.numOfBlocks; + pDistInfo->numOfTables += p1.numOfTables; + pDistInfo->numOfInmemRows += p1.numOfInmemRows; + pDistInfo->totalSize += p1.totalSize; + pDistInfo->totalRows += p1.totalRows; + pDistInfo->numOfFiles += p1.numOfFiles; + + if (pDistInfo->minRows > p1.minRows) { + pDistInfo->minRows = p1.minRows; + } + if (pDistInfo->maxRows < p1.maxRows) { + pDistInfo->maxRows = p1.maxRows; + } + + for(int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) { + pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; + } + pResInfo->numOfRes = 1; return TSDB_CODE_SUCCESS; } -- GitLab