diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6f933c663eba534250bc9cb77cfd545212551dd8..15f3246013c5d289aa6689d55de162c435153344 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -203,11 +203,10 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); void blockDataCleanup(SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); -void* blockDataDestroy(SSDataBlock* pBlock); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); +SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 47082395e94fda000326685ee6749e2d3d32b9cf..699e9a598765bcb137c1f22b21164937082c4e2d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -408,7 +408,11 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { } SResultColumn *pCol = &pResultInfo->pCol[col]; - return colDataIsNull_f(pCol->nullbitmap, row); + if (IS_VAR_DATA_TYPE(pResultInfo->fields[col].type)) { + return (pCol->offset[row] == -1); + } else { + return colDataIsNull_f(pCol->nullbitmap, row); + } } bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 053a1a1ee6a802da00350cc08b429191d6ffd894..a4c3b2381303da206cdbf9dbc7b5863333eea2fc 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1149,10 +1149,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } -SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { +SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { if(pDataBlock == NULL){ return NULL; } + int32_t numOfCols = pDataBlock->info.numOfCols; SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -1160,6 +1161,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; + pBlock->info.rowSize = pDataBlock->info.rows; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; @@ -1168,6 +1170,23 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { taosArrayPush(pBlock->pDataBlock, &colInfo); } + if (copyData) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); + + int32_t code = colInfoDataEnsureCapacity(pDst, pDataBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + colDataAssign(pDst, pSrc, pDataBlock->info.rows); + } + + pBlock->info.rows = pDataBlock->info.rows; + pBlock->info.capacity = pDataBlock->info.rows; + } + return pBlock; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f4c91b6bfb93f76a4b847130efc2dad1e8e6842b..cbd2dc66f5f9dfa8e2c7219ab2a67e659ec3779f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -468,10 +468,10 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct { - char *pData; - bool isNull; - int16_t type; - int32_t bytes; + char *pData; + bool isNull; + int16_t type; + int32_t bytes; } SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { @@ -497,19 +497,19 @@ typedef struct SDataGroupInfo { // The sort in partition may be needed later. typedef struct SPartitionOperatorInfo { - SOptrBasicInfo binfo; - SArray* pGroupCols; - SArray* pGroupColVals; // current group column values, SArray - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width - SHashObj* pGroupSet; // quick locate the window object for each result - - SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file - int32_t rowCapacity; // maximum number of rows for each buffer page - int32_t* columnOffset; // start position for each column data - - void* pGroupIter; // group iterator - int32_t pageIndex; // page index of current group + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data + + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -548,21 +548,21 @@ typedef struct SStateWindowOperatorInfo { } SStateWindowOperatorInfo; typedef struct SSortedMergeOperatorInfo { - SOptrBasicInfo binfo; - bool hasVarCol; + SOptrBasicInfo binfo; + bool hasVarCol; - SArray* pSortInfo; - int32_t numOfSources; - SSortHandle *pSortHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t resultRowFactor; - bool hasGroupVal; - SDiskbasedBuf *pTupleStore; // keep the final results - int32_t numOfResPerPage; - char** groupVal; - SArray *groupInfo; - SAggSupporter aggSup; + SArray* pSortInfo; + int32_t numOfSources; + SSortHandle *pSortHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + int32_t resultRowFactor; + bool hasGroupVal; + SDiskbasedBuf *pTupleStore; // keep the final results + int32_t numOfResPerPage; + char** groupVal; + SArray *groupInfo; + SAggSupporter aggSup; } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { @@ -590,8 +590,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); -int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, - int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); +int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 15203d91cafc3611fd316c91db8f0f4a895cfb02..4af7e563e6eb85dceae04be58c069c5087060b49 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -52,7 +52,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SSDataBlock* p = createOneDataBlock(pDataBlock); + SSDataBlock* p = createOneDataBlock(pDataBlock, false); p->info = pDataBlock->info; taosArrayClear(p->pDataBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f5cf130398122e6afb167d1bd47ae8de0a9d5c0f..33520475e439080442b03d484d20cc421bcc8831 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1701,9 +1701,8 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { } } -int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, - int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, - SAggSupporter* pAggSup) { +int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, + int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup) { SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo; SqlFunctionCtx* pCtx = binfo->pCtx; @@ -3039,7 +3038,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); - SSDataBlock* px = createOneDataBlock(pBlock); + SSDataBlock* px = createOneDataBlock(pBlock, false); blockDataEnsureCapacity(px, pBlock->info.rows); // todo extract method @@ -4614,7 +4613,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { SSortedMergeOperatorInfo* pInfo = pOperator->info; SSortHandle* pHandle = pInfo->pSortHandle; - SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes); + SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false); blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity); while (1) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c8a6697d4e944c44923ba0a09f8f995a266eeeef..cd79d719dad44f51cfa407889c06837bdf56850b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -227,7 +227,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); - int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -244,7 +244,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (num > 0) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = - setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, + setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 29c7283d393ee9b03d7cf84c6ed2fcc229ef53d1..59ef31b2ff494ae766463908a8a16f62e5cc442a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -590,7 +590,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); filterFreeInfo(filter); - SSDataBlock* px = createOneDataBlock(pInfo->pRes); + SSDataBlock* px = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(px, pInfo->pRes->info.rows); // TODO refactor diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 885112d440aac01f5bdfad1018fd5bb2ca5feac7..2ac28200e79d3b365fc75d055f785e0a99963355 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -99,7 +99,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t pSortHandle->numOfPages = numOfPages; pSortHandle->pSortInfo = pSortInfo; pSortHandle->pIndexMap = pIndexMap; - pSortHandle->pDataBlock = createOneDataBlock(pBlock); + pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->cmpParam.orderInfo = pSortInfo; @@ -206,7 +206,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { blockDataCleanup(pDataBlock); - SSDataBlock* pBlock = createOneDataBlock(pDataBlock); + SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); } @@ -488,7 +488,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { tMergeTreeDestroy(pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; - SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock); + SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); if (code != 0) { return code; @@ -531,7 +531,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { } if (pHandle->pDataBlock == NULL) { - pHandle->pDataBlock = createOneDataBlock(pBlock); + pHandle->pDataBlock = createOneDataBlock(pBlock, false); } int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);