diff --git a/include/common/tcommon.h b/include/common/tcommon.h index b3b47b4c68f5f9d8a55a8510adfd3e78790a8735..b5bd088006b1451d45dd1aa78eac62803cf474d6 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -56,7 +56,7 @@ typedef struct SColumnDataAgg { typedef struct SDataBlockInfo { STimeWindow window; int32_t rows; - int32_t tupleSize; + int32_t rowSize; int32_t numOfCols; union {int64_t uid; int64_t blockId;}; } SDataBlockInfo; @@ -70,10 +70,10 @@ typedef struct SConstantItem { // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { - SColumnDataAgg* pBlockAgg; - SArray* pDataBlock; // SArray - SArray* pConstantList; // SArray, it is a constant/tags value of the corresponding result value. - SDataBlockInfo info; + SColumnDataAgg *pBlockAgg; + SArray *pDataBlock; // SArray + SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value. + SDataBlockInfo info; } SSDataBlock; typedef struct SVarColAttr { @@ -218,18 +218,17 @@ typedef struct SColumn { int64_t dataBlockId; }; - char name[TSDB_COL_NAME_LEN]; - int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) union { int16_t colId; int16_t slotId; }; + char name[TSDB_COL_NAME_LEN]; + int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) int16_t type; int32_t bytes; uint8_t precision; uint8_t scale; - // SColumnInfo info; } SColumn; typedef struct SLimit { @@ -254,11 +253,20 @@ typedef struct SFunctParam { } SFunctParam; // the structure for sql function in select clause +typedef struct SResSchame { + int8_t type; + int32_t colId; + int32_t bytes; + int32_t precision; + int32_t scale; + char name[TSDB_COL_NAME_LEN]; +} SResSchema; + +// TODO move away to executor.h typedef struct SExprBasicInfo { - SSchema resSchema; // TODO refactor + SResSchema resSchema; int16_t numOfParams; // argument value of each function SFunctParam *pParam; -// SVariant param[3]; // parameters are not more than 3 } SExprBasicInfo; typedef struct SExprInfo { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 58839a071180c0faf3109e8d0fab65e580765b4b..0e0a5f93339ff48e5f6891c41fb558290b64c7a7 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -632,7 +632,7 @@ typedef struct SOrderOperatorInfo { uint64_t totalElapsed; // total elapsed time } SOrderOperatorInfo; -SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cce05f93f9ca8921007596f221a30a426675e593..30a5bd3f80cda4b5a3031c6e42100cf523513fc1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -347,6 +347,7 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pBlock->info.blockId = pNode->dataBlockId; + pBlock->info.rowSize = pNode->resultRowSize; for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; @@ -683,12 +684,6 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t w.ekey = w.skey + pInterval->interval - 1; } } - - /* - * query border check, skey should not be bounded by the query time range, since the value skey will - * be used as the time window index value. So we only change ekey of time window accordingly. - */ -// ASSERT(win->skey <= win->ekey); // todo no need this return w; } @@ -3348,14 +3343,13 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t cleanupResultRowEntry(pEntry); pCtx[i].resultInfo = pEntry; - pCtx[i].pOutput = pData->pData; // todo remove it pCtx[i].currentStage = stage; // set the timestamp output buffer for top/bottom/diff query - int32_t fid = pCtx[i].functionId; - if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) { - if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; - } +// int32_t fid = pCtx[i].functionId; +// if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) { +// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; +// } } initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); @@ -3663,7 +3657,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.tupleSize); + int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.rowSize); if (ret != TSDB_CODE_SUCCESS) { return; } @@ -5187,9 +5181,10 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { #endif } +// TODO remove it static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); -SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -5200,9 +5195,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* return NULL; } - size_t numOfSources = taosArrayGetSize(pSources); + size_t numOfSources = LIST_LENGTH(pSources); - pInfo->pSources = taosArrayDup(pSources); +// pInfo->pSources = taosArrayDup(pSources); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { tfree(pInfo); @@ -5222,8 +5217,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); } - size_t size = taosArrayGetSize(pExprInfo); - pInfo->pResult = createResultDataBlock(pExprInfo); + size_t size = taosArrayGetSize(pExprInfo); + pInfo->pResult = createResultDataBlock(pExprInfo); pInfo->seqLoadData = true; tsem_init(&pInfo->ready, 0, 0); @@ -5277,11 +5272,12 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { SColumnInfoData colInfoData = {0}; SExprInfo* p = taosArrayGetP(pExprInfo, i); - SSchema* pSchema = &p->base.resSchema; + SResSchema* pSchema = &p->base.resSchema; colInfoData.info.type = pSchema->type; colInfoData.info.colId = pSchema->colId; colInfoData.info.bytes = pSchema->bytes; - + colInfoData.info.scale = pSchema->scale; + colInfoData.info.precision = pSchema->precision; taosArrayPush(pResult, &colInfoData); } @@ -8016,9 +8012,19 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t return TSDB_CODE_SUCCESS; } -SArray* createExprInfo(SAggPhysiNode* pPhyNode, int32_t* resultRowSize) { - *resultRowSize = pPhyNode->node.pOutputDataBlockDesc->resultRowSize; +static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { + SResSchema s = {0}; + s.scale = scale; + s.precision = precision; + s.type = type; + s.bytes = bytes; + s.colId = slotId; + strncpy(s.name, name, tListLen(s.name)); + + return s; +} +SArray* createExprInfo(SAggPhysiNode* pPhyNode) { int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs); SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES); @@ -8038,9 +8044,12 @@ SArray* createExprInfo(SAggPhysiNode* pPhyNode, int32_t* resultRowSize) { ASSERT(pTargetNode->slotId == i); SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; - pExp->base.resSchema = createSchema(pFuncNode->node.resType.type, pFuncNode->node.resType.bytes, pTargetNode->slotId, pFuncNode->node.aliasName); - pExp->pExpr->_function.pFunctNode = pFuncNode; + SDataType *pType = &pFuncNode->node.resType; + pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, + pType->scale, pType->precision, pFuncNode->node.aliasName); + + pExp->pExpr->_function.pFunctNode = pFuncNode; strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); // TODO: value parameter needs to be handled @@ -8089,16 +8098,17 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; - size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); - tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId); + size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); + tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, + pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { - // SExchangePhysiNode* pEx = (SExchangePhysiNode*) pPhyNode; - // return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); + SExchangePhysiNode* pEx = (SExchangePhysiNode*)pPhyNode; + return createExchangeOperatorInfo(pEx->pSrcEndPoints, NULL, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { - SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. + SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableGroupInfo groupInfo = {0}; int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); @@ -8136,8 +8146,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - int32_t resultRowSize = 0; - SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode, &resultRowSize); + SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); }