提交 f3baaf1a 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -696,10 +696,6 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -696,10 +696,6 @@ typedef struct SSortedMergeOperatorInfo {
int32_t numOfResPerPage; int32_t numOfResPerPage;
char** groupVal; char** groupVal;
SArray *groupInfo; SArray *groupInfo;
bool hasGroupId;
uint64_t groupId;
STupleHandle* prefetchedTuple;
} SSortedMergeOperatorInfo; } SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo { typedef struct SSortOperatorInfo {
...@@ -712,10 +708,6 @@ typedef struct SSortOperatorInfo { ...@@ -712,10 +708,6 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
STupleHandle *prefetchedTuple;
bool hasGroupId;
uint64_t groupId;
} SSortOperatorInfo; } SSortOperatorInfo;
typedef struct STagFilterOperatorInfo { typedef struct STagFilterOperatorInfo {
......
...@@ -130,12 +130,6 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId); ...@@ -130,12 +130,6 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
*/ */
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId); void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
/**
*
* @param pVHandle
* @return
*/
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
/** /**
* *
* @param pSortHandle * @param pSortHandle
......
...@@ -3031,31 +3031,12 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo ...@@ -3031,31 +3031,12 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
...@@ -3074,7 +3055,6 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo ...@@ -3074,7 +3055,6 @@ SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
pDataBlock->info.rows = p->info.rows; pDataBlock->info.rows = p->info.rows;
pDataBlock->info.capacity = p->info.rows; pDataBlock->info.capacity = p->info.rows;
pDataBlock->info.groupId = pInfo->groupId;
} }
blockDataDestroy(p); blockDataDestroy(p);
...@@ -3340,7 +3320,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -3340,7 +3320,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL; size_t rows = blockDataGetNumOfRows(pInfo->pRes);
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
return (rows == 0) ? NULL : pInfo->pRes; return (rows == 0) ? NULL : pInfo->pRes;
...@@ -4920,7 +4900,10 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -4920,7 +4900,10 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
} }
SArray* extractPartitionColInfo(SNodeList* pNodeList) { SArray* extractPartitionColInfo(SNodeList* pNodeList) {
if (!pNodeList) return NULL; if(!pNodeList) {
return NULL;
}
size_t numOfCols = LIST_LENGTH(pNodeList); size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) { if (pList == NULL) {
......
...@@ -2176,25 +2176,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa ...@@ -2176,25 +2176,13 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
pTupleHandle = tsortNextTuple(pHandle); pTupleHandle = tsortNextTuple(pHandle);
} else { } else {
pTupleHandle = pInfo->prefetchedTuple; pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
} }
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
......
...@@ -40,15 +40,13 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR ...@@ -40,15 +40,13 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->hasGroupId = false; pOperator->name = "SortOperator";
pInfo->prefetchedTuple = NULL;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now. // lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize +
...@@ -97,31 +95,12 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i ...@@ -97,31 +95,12 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
...@@ -140,7 +119,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i ...@@ -140,7 +119,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
pDataBlock->info.rows = p->info.rows; pDataBlock->info.rows = p->info.rows;
pDataBlock->info.capacity = p->info.rows; pDataBlock->info.capacity = p->info.rows;
pDataBlock->info.groupId = pInfo->groupId;
} }
blockDataDestroy(p); blockDataDestroy(p);
...@@ -255,10 +233,7 @@ typedef struct SMultiwaySortMergeOperatorInfo { ...@@ -255,10 +233,7 @@ typedef struct SMultiwaySortMergeOperatorInfo {
SSDataBlock* pInputBlock; SSDataBlock* pInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
uint64_t groupId;
bool hasGroupId;
uint64_t groupId;
STupleHandle* prefetchedTuple;
} SMultiwaySortMergeOperatorInfo; } SMultiwaySortMergeOperatorInfo;
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
...@@ -312,31 +287,12 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData ...@@ -312,31 +287,12 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->groupId = tsortGetGroupId(pTupleHandle);
pInfo->prefetchedTuple = NULL;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (!pInfo->hasGroupId) {
pInfo->groupId = tupleGroupId;
pInfo->hasGroupId = true;
appendOneRowToDataBlock(p, pTupleHandle);
} else if (pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
...@@ -432,14 +388,12 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, ...@@ -432,14 +388,12 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
uint32_t numOfSources = taosArrayGetSize(pSortInfo); uint32_t numOfSources = taosArrayGetSize(pSortInfo);
numOfSources = TMAX(2, numOfSources); numOfSources = TMAX(4, numOfSources);
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize; pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
......
...@@ -557,59 +557,40 @@ static int32_t createInitialSources(SSortHandle* pHandle) { ...@@ -557,59 +557,40 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource); taosArrayClear(pHandle->pOrderedSource);
bool hasGroupId = false;
SSDataBlock* prefetchedDataBlock = NULL;
while (1) { while (1) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (prefetchedDataBlock == NULL) {
pBlock = pHandle->fetchfp(source->param);
} else {
pBlock = prefetchedDataBlock;
prefetchedDataBlock = NULL;
}
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
if (!hasGroupId) { if (pHandle->pDataBlock == NULL) {
// calculate the buffer pages according to the total available buffers.
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock)); pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
// todo, number of pages are set according to the total available sort buffer // todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024; pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize; sortBufSize = pHandle->numOfPages * pHandle->pageSize;
hasGroupId = true;
pHandle->pDataBlock = createOneDataBlock(pBlock, false); pHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) { if (pHandle->beforeFp != NULL) {
// perform the scalar function calculation before apply the sort pHandle->beforeFp(pBlock, pHandle->param);
if (pHandle->beforeFp != NULL) { }
pHandle->beforeFp(pBlock, pHandle->param);
}
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) { if (code != 0) {
return code; return code;
} }
size_t size = blockDataGetSize(pHandle->pDataBlock); size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) { if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
doAddToBuf(pHandle->pDataBlock, pHandle); doAddToBuf(pHandle->pDataBlock, pHandle);
}
} else {
prefetchedDataBlock = pBlock;
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
} }
...@@ -758,10 +739,6 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { ...@@ -758,10 +739,6 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
} }
} }
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
return pVHandle->pBlock->info.groupId;
}
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
SSortExecInfo info = {0}; SSortExecInfo info = {0};
......
Subproject commit 0a81480420d6601bbdb57770ee64e40f24c4ea83 Subproject commit 3d5aa76f8c718dcffa100b45e4cbf313d499c356
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册