From 4b36763429b83275c6bc473d098ee6486eabe26f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Apr 2022 22:43:07 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- include/libs/function/function.h | 21 +-- source/client/src/clientImpl.c | 4 +- source/libs/executor/src/executorimpl.c | 26 ++-- source/libs/function/inc/builtinsimpl.h | 8 +- source/libs/function/src/builtinsimpl.c | 193 ++++++++++++++++++------ 5 files changed, 179 insertions(+), 73 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 004d834287..ed8d54cef8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -37,7 +37,7 @@ typedef struct SFuncExecEnv { typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); -typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock, int32_t slotId); +typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef struct SScalarFuncExecFuncs { @@ -141,8 +141,7 @@ struct SResultRowEntryInfo; //for selectivity query, the corresponding tag value is assigned if the data is qualified typedef struct SSubsidiaryResInfo { - int16_t bufLen; // keep the tags data for top/bottom query result - int16_t numOfCols; + int16_t num; struct SqlFunctionCtx **pCtx; } SSubsidiaryResInfo; @@ -187,8 +186,8 @@ typedef struct SqlFunctionCtx { uint8_t currentStage; // record current running step, default: 0 bool isAggSet; int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it - ///////////////////////////////////////////////////////////////// bool stableQuery; + ///////////////////////////////////////////////////////////////// int16_t functionId; // function id char * pOutput; // final result output buffer, point to sdata->data int32_t numOfParams; @@ -198,11 +197,15 @@ typedef struct SqlFunctionCtx { int32_t offset; SVariant tag; struct SResultRowEntryInfo *resultInfo; - SSubsidiaryResInfo subsidiaryRes; - SPoint1 start; - SPoint1 end; - SFuncExecFuncs fpSet; - SScalarFuncExecFuncs sfp; + SSubsidiaryResInfo subsidiaries; + SPoint1 start; + SPoint1 end; + SFuncExecFuncs fpSet; + SScalarFuncExecFuncs sfp; + SExprInfo *pExpr; + struct SDiskbasedBuf *pBuf; + struct SSDataBlock *pSrcBlock; + int32_t curBufPage; } SqlFunctionCtx; enum { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index aae9303bdc..8fcd16a622 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -105,9 +105,9 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, epSet.epSet.eps[0].port = port; } - char* key = getClusterKey(user, secretEncrypt, ip, port); - SAppInstInfo** pInst = NULL; + char* key = getClusterKey(user, secretEncrypt, ip, port); + SAppInstInfo** pInst = NULL; taosThreadMutexLock(&appInfo.mutex); pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9c437d3f88..b28dadb594 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1082,8 +1082,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - pCtx[i].order = order; - pCtx[i].size = pBlock->info.rows; + pCtx[i].order = order; + pCtx[i].size = pBlock->info.rows; + pCtx[i].pSrcBlock = pBlock; pCtx[i].currentStage = MAIN_SCAN; SInputColumnInfoData* pInput = &pCtx[i].input; @@ -1836,9 +1837,8 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } } if (p != NULL) { - p->subsidiaryRes.pCtx = pTagCtx; - p->subsidiaryRes.numOfCols = num; - p->subsidiaryRes.bufLen = tagLen; + p->subsidiaries.pCtx = pTagCtx; + p->subsidiaries.num = num; } else { taosMemoryFreeClear(pTagCtx); } @@ -1865,6 +1865,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, SqlFunctionCtx* pCtx = &pFuncCtx[i]; pCtx->functionId = -1; + pCtx->curBufPage = -1; + pCtx->pExpr = pExpr; + if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) { SFuncExecEnv env = {0}; pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; @@ -1892,9 +1895,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->pTsOutput = NULL; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; pCtx->resDataInfo.type = pFunct->resSchema.type; - pCtx->order = TSDB_ORDER_ASC; + pCtx->order = TSDB_ORDER_ASC; pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; + pCtx->end.key = INT64_MIN; pCtx->numOfParams = pExpr->base.numOfParams; pCtx->param = pFunct->pParam; @@ -1953,7 +1956,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } taosVariantDestroy(&pCtx[i].tag); - taosMemoryFreeClear(pCtx[i].subsidiaryRes.pCtx); + taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); } taosMemoryFreeClear(pCtx); @@ -3139,7 +3142,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); if (pCtx[j].fpSet.process) { - pCtx[j].fpSet.finalize(&pCtx[j], pBlock, slotId); + pCtx[j].fpSet.finalize(&pCtx[j], pBlock); } else { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); @@ -5633,6 +5636,11 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf pBasicInfo->pRes = pResultBlock; doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); + + for(int32_t i = 0; i < numOfCols; ++i) { + pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf; + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index db92740fff..e2b6d9c392 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -24,7 +24,7 @@ extern "C" { #include "functionMgt.h" bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); +int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); @@ -43,12 +43,12 @@ int32_t maxFunction(SqlFunctionCtx *pCtx); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); -int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); +int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t percentileFunction(SqlFunctionCtx *pCtx); -int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); +int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); @@ -60,7 +60,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); -int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3dc2e62e92..f5f1b38e35 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -14,7 +14,7 @@ */ #include "builtinsimpl.h" -#include +#include "function.h" #include "querynodes.h" #include "taggfunction.h" #include "tdatablock.h" @@ -49,7 +49,8 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { return true; } -int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { +int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -303,16 +304,16 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } \ } while (0); -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ +#define DO_UPDATE_SUBSID_RES(ctx, ts) \ + do { \ + for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \ + SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ } while (0) #define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ @@ -378,8 +379,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { *(int64_t*) buf = val; - for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) { - SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i]; + for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { + SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor __ctx->tag.i = key; __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; @@ -395,8 +396,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { *(uint64_t*) buf = val; - for (int32_t i = 0; i < (pCtx)->subsidiaryRes.numOfCols; ++i) { - SqlFunctionCtx* __ctx = pCtx->subsidiaryRes.pCtx[i]; + for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { + SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor __ctx->tag.i = key; __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; @@ -618,11 +619,11 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { +int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); double avg = pStddevRes->isum / ((double) pStddevRes->count); pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg); - return functionFinalize(pCtx, pBlock, slotId); + return functionFinalize(pCtx, pBlock); } typedef struct SPercentileInfo { @@ -744,7 +745,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { +int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SVariant* pVal = &pCtx->param[1].param; double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d; @@ -757,7 +758,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t sl } tMemBucketDestroy(pMemBucket); - return functionFinalize(pCtx, pBlock, slotId); + return functionFinalize(pCtx, pBlock); } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -1178,7 +1179,7 @@ typedef struct STopBotResItem { } STopBotResItem; typedef struct STopBotRes { - int32_t pageId; +// int32_t pageId; // int32_t num; STopBotResItem *pItems; } STopBotRes; @@ -1197,15 +1198,16 @@ static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { return pRes; } -static void doAddIntoResult(STopBotRes* pRes, int32_t maxSize, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, +static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo); +static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem); +static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem); + int32_t topFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - STopBotRes *pRes = getTopBotOutputInfo(pCtx); - // if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) { // buildTopBotStruct(pRes, pCtx); // } @@ -1225,7 +1227,7 @@ int32_t topFunction(SqlFunctionCtx *pCtx) { numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pRes, pCtx->param[1].param.i, data, i, NULL, type, pInput->uid, pResInfo); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo); } return TSDB_CODE_SUCCESS; @@ -1258,9 +1260,11 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par return (val1->v.d > val2->v.d) ? 1 : -1; } +void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, + uint64_t uid, SResultRowEntryInfo* pEntryInfo) { + STopBotRes *pRes = getTopBotOutputInfo(pCtx); + int32_t maxSize = pCtx->param[1].param.i; -void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo) { SVariant val = {0}; taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); @@ -1272,22 +1276,9 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes]; pItem->v = val; pItem->uid = uid; - pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer - - if (pRes->pageId == -1) { - SFilePage* pPage = getNewBufPage(NULL, 0, &pRes->pageId); - pPage->num = sizeof(SFilePage); - // keep the current row data - for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); - bool isNull = colDataIsNull_s(pCol, rowIndex); - - - colDataGetData(pCol, rowIndex); - } - - } + // save the data of this tuple + saveTupleData(pCtx, rowIndex, pSrcBlock, pItem); // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; @@ -1296,22 +1287,100 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t row if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) { + // replace the old data and the coresponding tuple data STopBotResItem* pItem = &pItems[0]; pItem->v = val; pItem->uid = uid; - pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer + + // save the data of this tuple by over writing the old data + copyTupleData(pCtx, rowIndex, pSrcBlock, pItem); taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false); } } } -int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { +void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) { + SFilePage* pPage = NULL; + + int32_t completeRowSize = pSrcBlock->info.rowSize + pSrcBlock->info.numOfCols * sizeof(bool); + + if (pCtx->curBufPage == -1) { + pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage); + pPage->num = sizeof(SFilePage); + } else { + pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage); + if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) { + pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage); + pPage->num = sizeof(SFilePage); + } + } + + pItem->tuplePos.pageId = pCtx->curBufPage; + + // keep the current row data, extract method + int32_t offset = 0; + bool* nullList = (bool*)((char*)pPage + pPage->num); + char* pStart = (char*)(nullList + sizeof(bool) * pSrcBlock->info.numOfCols); + for (int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); + bool isNull = colDataIsNull_s(pCol, rowIndex); + if (isNull) { + nullList[i] = true; + continue; + } + + char* p = colDataGetData(pCol, rowIndex); + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + memcpy(pStart + offset, p, varDataTLen(p)); + } else { + memcpy(pStart + offset, p, pCol->info.bytes); + } + + offset += pCol->info.bytes; + } + + pItem->tuplePos.offset = pPage->num; + pPage->num += completeRowSize; + + setBufPageDirty(pPage, true); + releaseBufPage(pCtx->pBuf, pPage); +} + +void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STopBotResItem* pItem) { + SFilePage* pPage = getBufPage(pCtx->pBuf, pItem->tuplePos.pageId); + + bool* nullList = (bool*)((char*)pPage + pItem->tuplePos.offset); + char* pStart = (char*)(nullList + pSrcBlock->info.numOfCols * sizeof(bool)); + + int32_t offset = 0; + for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i); + if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) { + continue; + } + + char* p = colDataGetData(pCol, rowIndex); + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + memcpy(pStart + offset, p, varDataTLen(p)); + } else { + memcpy(pStart + offset, p, pCol->info.bytes); + } + + offset += pCol->info.bytes; + } + + setBufPageDirty(pPage, true); + releaseBufPage(pCtx->pBuf, pPage); +} + +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); pEntryInfo->complete = true; int32_t type = pCtx->input.pData[0]->info.type; + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); // todo assign the tag value and the corresponding row data @@ -1320,19 +1389,45 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId case TSDB_DATA_TYPE_INT: { for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { STopBotResItem* pItem = &pRes->pItems[i]; - colDataAppendInt32(pCol, currentRow++, (int32_t*)&pItem->v.i); + colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i); int32_t pageId = pItem->tuplePos.pageId; int32_t offset = pItem->tuplePos.offset; - if (pageId != -1) { - // todo + if (pItem->tuplePos.pageId != -1) { + SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); + + bool* nullList = (bool*)((char*)pPage + offset); + char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool)); + + // todo set the offset value to optimize the performance. + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + + SFunctParam *pFuncParam = &pc->pExpr->base.pParam[0]; + int32_t srcSlotId = pFuncParam->pCol->slotId; + int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId; + + int32_t ps = 0; + for(int32_t k = 0; k < srcSlotId; ++k) { + SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k); + ps += pSrcCol->info.bytes; + } + + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + if (nullList[srcSlotId]) { + colDataAppendNULL(pDstCol, currentRow); + } else { + colDataAppend(pDstCol, currentRow, (pStart + ps), false); + } + } } + + currentRow += 1; } + break; } } return pEntryInfo->numOfRes; - -// return functionFinalize(pCtx, pBlock, slotId); } \ No newline at end of file -- GitLab