diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 772b1f69a329054cd255ff486d7504eb0be29fa6..caf82924f403582094e37ae9ffa80969433d84f4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -131,44 +131,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con memcpy(pColumnInfoData->pData + len, pData, varDataTLen(pData)); pColumnInfoData->varmeta.length += varDataTLen(pData); } else { - char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; - switch (type) { - case TSDB_DATA_TYPE_BOOL: { - *(bool*)p = *(bool*)pData; - break; - } - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_UTINYINT: { - *(int8_t*)p = *(int8_t*)pData; - break; - } - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: { - *(int16_t*)p = *(int16_t*)pData; - break; - } - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: { - *(int32_t*)p = *(int32_t*)pData; - break; - } - case TSDB_DATA_TYPE_TIMESTAMP: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: { - *(int64_t*)p = *(int64_t*)pData; - break; - } - case TSDB_DATA_TYPE_FLOAT: { - *(float*)p = *(float*)pData; - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - *(double*)p = *(double*)pData; - break; - } - default: - assert(0); - } + memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow, pData, pColumnInfoData->info.bytes); } return 0; @@ -562,6 +525,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); if (IS_VAR_DATA_TYPE(pCol->info.type)) { + char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCol->varmeta.offset = (int32_t*)tmp; memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; } else { @@ -738,61 +706,12 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]]; } } else { - switch (pSrc->info.type) { - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_INT: { - for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { - int32_t* p = (int32_t*)pDst->pData; - int32_t* srclist = (int32_t*)pSrc->pData; - - p[j] = srclist[index[j]]; - if (colDataIsNull_f(pSrc->nullbitmap, index[j])) { - colDataSetNull_f(pDst->nullbitmap, j); - } - } - break; - } - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_TINYINT: { - for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { - int32_t* p = (int32_t*)pDst->pData; - int32_t* srclist = (int32_t*)pSrc->pData; - - p[j] = srclist[index[j]]; - if (colDataIsNull_f(pSrc->nullbitmap, index[j])) { - colDataSetNull_f(pDst->nullbitmap, j); - } - } - break; - } - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_SMALLINT: { - for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { - int32_t* p = (int32_t*)pDst->pData; - int32_t* srclist = (int32_t*)pSrc->pData; - - p[j] = srclist[index[j]]; - if (colDataIsNull_f(pSrc->nullbitmap, index[j])) { - colDataSetNull_f(pDst->nullbitmap, j); - } - } - break; - } - case TSDB_DATA_TYPE_UBIGINT: - case TSDB_DATA_TYPE_BIGINT: { - for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { - int32_t* p = (int32_t*)pDst->pData; - int32_t* srclist = (int32_t*)pSrc->pData; - - p[j] = srclist[index[j]]; - if (colDataIsNull_f(pSrc->nullbitmap, index[j])) { - colDataSetNull_f(pDst->nullbitmap, j); - } - } - break; + for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { + if (colDataIsNull_f(pSrc->nullbitmap, index[j])) { + colDataSetNull_f(pDst->nullbitmap, j); + continue; } - default: - assert(0); + memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes); } } } @@ -938,12 +857,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { int64_t p2 = taosGetTimestampUs(); - int32_t code = blockDataAssign(pCols, pDataBlock, index); - if (code != TSDB_CODE_SUCCESS) { - destroyTupleIndex(index); - terrno = code; - return code; - } + blockDataAssign(pCols, pDataBlock, index); int64_t p3 = taosGetTimestampUs(); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2ccc92111ae9d402177aa092c2c7811fccbe61f7..f960993e3e9714074196d7ed359717ca93da7dac 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5769,10 +5769,11 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan pBlock->info.rows += 1; } -static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, - int32_t capacity) { +static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { blockDataCleanup(pDataBlock); + blockDataEnsureCapacity(pDataBlock, capacity); + while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { @@ -5971,7 +5972,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -6116,10 +6117,9 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortOperatorInfo* pInfo = pOperator->info; - bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -6139,7 +6139,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 511cba972c61db91f0e3af2d3b257505e7ab1ef5..49a99ec284927ac89e8e845ace17282c689df5b2 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -111,6 +111,17 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page return pSortHandle; } +static int32_t sortComparClearup(SMsortComparParam* cmpParam) { + for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { + SExternalMemSource* pSource = cmpParam->pSources[i]; + blockDataDestroy(pSource->src.pBlock); + taosMemoryFreeClear(pSource); + } + + cmpParam->numOfSources = 0; + return TSDB_CODE_SUCCESS; +} + void tsortDestroySortHandle(SSortHandle* pSortHandle) { tsortClose(pSortHandle); if (pSortHandle->pMergeTree != NULL) { @@ -119,6 +130,8 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { destroyDiskbasedBuf(pSortHandle->pBuf); taosMemoryFreeClear(pSortHandle->idStr); + blockDataDestroy(pSortHandle->pDataBlock); + sortComparClearup(&pSortHandle->cmpParam); // pOrderedSource is in cmpParam taosMemoryFreeClear(pSortHandle); } @@ -168,6 +181,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId); if (pPage == NULL) { + blockDataDestroy(p); return terrno; } @@ -227,17 +241,6 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int return code; } -static int32_t sortComparClearup(SMsortComparParam* cmpParam) { - for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { - SExternalMemSource* pSource = cmpParam->pSources[i]; - blockDataDestroy(pSource->src.pBlock); - taosMemoryFreeClear(pSource); - } - - cmpParam->numOfSources = 0; - return TSDB_CODE_SUCCESS; -} - static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); @@ -312,7 +315,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa return TSDB_CODE_SUCCESS; } -static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { +static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { blockDataCleanup(pHandle->pDataBlock); while(1) { @@ -454,7 +457,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } while (1) { - SSDataBlock* pDataBlock = getSortedBlockData(pHandle, &pHandle->cmpParam, numOfRows); + SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { break; } diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index 7e2fd0fed29474d598fdfd7da5f5f6d503ac9fec..0674f05f332aed7f05e78c11bff3f3e730bd2b41 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -198,6 +198,7 @@ TEST(testCase, inMem_sort_Test) { int32_t code = tsortOpen(phandle); int32_t row = 1; + taosMemoryFreeClear(ps); while(1) { STupleHandle* pTupleHandle = tsortNextTuple(phandle); @@ -235,6 +236,7 @@ TEST(testCase, external_mem_sort_Test) { int32_t code = tsortOpen(phandle); int32_t row = 1; + taosMemoryFreeClear(ps); while(1) { STupleHandle* pTupleHandle = tsortNextTuple(phandle); @@ -245,8 +247,8 @@ TEST(testCase, external_mem_sort_Test) { void* v = tsortGetValue(pTupleHandle, 0); printf("%d: %d\n", row, *(int32_t*) v); ASSERT_EQ(row++, *(int32_t*) v); - //char buf[64] = {0}; - //snprintf(buf, varDataLen(v), "%s", varDataVal(v)); + char buf[64] = {0}; + memcpy(buf, varDataVal(v), varDataLen(v)); //printf("%d: %s\n", row, buf); } tsortDestroySortHandle(phandle);