From f700e4746c1cfa1fcdfc7633f84b4bda69117c1e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 8 Apr 2022 13:09:44 +0800 Subject: [PATCH] [td-14393] fix bug. --- source/libs/executor/inc/executorimpl.h | 4 +- source/libs/executor/src/executorimpl.c | 4 + source/libs/executor/src/groupoperator.c | 97 +++++++++++++++++------- source/util/src/tpagedbuf.c | 4 +- 4 files changed, 76 insertions(+), 33 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 908817f2b8..b4302787fb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -574,11 +574,13 @@ typedef struct SPartitionOperatorInfo { SOptrBasicInfo binfo; SArray* pGroupCols; SArray* pGroupColVals; // current group column values, SArray - bool isInit; // denote if current val is initialized or not char* keyBuf; // group by keys for hash int32_t groupKeyLen; // total group by column width SHashObj* pGroupSet; // quick locate the window object for each result + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data void* pGroupIter; // group iterator int32_t pageIndex; // page index of current group diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 046049c50e..5fc7e81bd5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -298,6 +298,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { idata.info.precision = pDescNode->dataType.precision; taosArrayPush(pBlock->pDataBlock, &idata); + + if (IS_VAR_DATA_TYPE(idata.info.type)) { + pBlock->info.hasVarCol = true; + } } return pBlock; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 7295401242..e9b0497587 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -25,6 +25,8 @@ #include "thash.h" #include "ttypes.h" +static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); + static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -357,15 +359,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); for (int32_t j = 0; j < pBlock->info.rows; ++j) { - // Compare with the previous row of this column, and do not set the output buffer again if they are identical. recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); - int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096); SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); - void* pPage = NULL; + void* pPage = NULL; if (p == NULL) { // it is a new group SDataGroupInfo gi = {0}; gi.pPageList = taosArrayInit(100, sizeof(int32_t)); @@ -383,7 +383,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { pPage = getBufPage(pInfo->pBuf, *curId); int32_t *rows = (int32_t*) pPage; - if (*rows >= numOfRows) { + if (*rows >= pInfo->rowCapacity) { // add a new page for current group int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); @@ -393,43 +393,54 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } } - int32_t* rows = (int32_t*) pPage; - (*rows) += 1; + // add one for this group + p->numOfRows += 1; - int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); - offset[0] = sizeof(int32_t); + int32_t* rows = (int32_t*) pPage; - int32_t numOfCols = pInfo->binfo.pRes->info.numOfCols; - for(int32_t i = 0; i < numOfCols - 1; ++i) { + size_t numOfCols = pInfo->binfo.pRes->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - offset[i + 1] = pColInfoData->info.bytes * numOfRows + numOfRows * sizeof(int32_t) + sizeof(int32_t) + offset[i]; - } else { - offset[i + 1] = pColInfoData->info.bytes * numOfRows + BitmapLen(numOfRows) + sizeof(int32_t) + offset[i]; - } - } + int32_t bytes = pColInfoData->info.bytes; + int32_t startOffset = pInfo->columnOffset[i]; - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + char* columnLen = NULL; + int32_t contentLen = 0; if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - // todo + int32_t* offset = pPage + startOffset; + columnLen = pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity; + char* data = (char*)(columnLen + sizeof(int32_t)); + + if (colDataIsNull_s(pColInfoData, j)) { + offset[(*rows)] = -1; + contentLen = 0; + } else { + offset[*rows] = (*columnLen); + char* src = colDataGetData(pColInfoData, j); + memcpy(data + (*columnLen), src, varDataTLen(src)); + contentLen = varDataTLen(src); + } } else { - char* bitmap = pPage + offset[i]; - int32_t* lenx = pPage + offset[i] + BitmapLen(numOfRows); - char* data = (char*) lenx + sizeof(int32_t); + char* bitmap = pPage + startOffset; + columnLen = pPage + startOffset + BitmapLen(pInfo->rowCapacity); + char* data = (char*) columnLen + sizeof(int32_t); - (*lenx )+= pColInfoData->info.bytes; bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); if (isNull) { - colDataSetNull_f(bitmap, (*rows) - 1); + colDataSetNull_f(bitmap, (*rows)); } else { - memcpy(data + ((*rows) - 1)* pColInfoData->info.bytes, colDataGetData(pColInfoData, j), pColInfoData->info.bytes); + memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes); } + contentLen = bytes; } + + (*columnLen) += contentLen; } + (*rows) += 1; + setBufPageDirty(pPage, true); releaseBufPage(pInfo->pBuf, pPage); } @@ -437,6 +448,30 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { // todo set the consistent group id according to the group keys } +int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { + size_t numOfCols = pBlock->info.numOfCols; + int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); + + offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format + + for(int32_t i = 0; i < numOfCols - 1; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int32_t bytes = pColInfoData->info.bytes; + int32_t payloadLen = bytes * rowCapacity; + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + // offset segment + content length + payload + offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i]; + } else { + // bitmap + content length + payload + offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i]; + } + } + + return offset; +} + static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; @@ -445,6 +480,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { // try next group data pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); if (pInfo->pGroupIter == NULL) { + pOperator->status = OP_EXEC_DONE; return NULL; } @@ -455,8 +491,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); void* page = getBufPage(pInfo->pBuf, *pageId); - int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096); - blockDataFromBuf1(pInfo->binfo.pRes, page, numOfRows); + blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); pInfo->pageIndex += 1; return pInfo->binfo.pRes; @@ -495,11 +530,12 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { } static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { - SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; + SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - taosMemoryFreeClear(pInfo->keyBuf); taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupColVals); + taosMemoryFree(pInfo->keyBuf); + taosMemoryFree(pInfo->columnOffset); } SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pGroupColList, @@ -523,6 +559,9 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBloc goto _error; } + pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf)); + pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index d834263b94..84a2efb46c 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -48,10 +48,8 @@ struct SDiskbasedBuf { }; static int32_t createDiskFile(SDiskbasedBuf* pBuf) { - // pBuf->file = fopen(pBuf->path, "wb+"); - pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); if (pBuf->pFile == NULL) { - // qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } -- GitLab