提交 6d37b663 编写于 作者: H Haojun Liao

[td-11818] refactor sortedmerge operator.

上级 bbc03a17
......@@ -47,6 +47,6 @@ OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO(AllMultiTableTimeInterval)
OP_ENUM_MACRO(Order)
OP_ENUM_MACRO(Exchange)
OP_ENUM_MACRO(SortMerge)
OP_ENUM_MACRO(SortedMerge)
//OP_ENUM_MACRO(TableScan)
......@@ -550,22 +550,20 @@ typedef struct SDistinctOperatorInfo {
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
typedef struct SSortMergeOperatorInfo {
SOptrBasicInfo binfo;
typedef struct SSortedMergeOperatorInfo {
SSDataBlock *pDataBlock;
bool hasVarCol;
SArray *orderInfo; // SArray<SBlockOrderInfo>
SArray* groupColumnList;
bool hasDataBlockForNewGroup;
char** currentGroupColData;
SArray* udfInfo;
bool nullFirst;
int32_t numOfSources;
} SSortMergeOperatorInfo;
typedef struct SMsortComparParam {
struct SExternalMemSource **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
SSortHandle *pSortHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
int32_t numOfRowsInRes;
} SSortedMergeOperatorInfo;
typedef struct SOrderOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort
......@@ -628,8 +626,8 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal);
SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, void* param, SArray* pUdfInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, void* param, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -45,6 +45,13 @@ typedef struct SOperatorSource {
void* param;
} SOperatorSource;
typedef struct SMsortComparParam {
void **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
typedef struct SSortHandle SSortHandle;
typedef struct STupleHandle STupleHandle;
......
......@@ -4521,13 +4521,13 @@ void queryCostStatis(SExecTaskInfo *pTaskInfo) {
// return true;
//}
void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
void appendDownstream(SOperatorInfo* p, SOperatorInfo* pDownstream) {
if (p->pDownstream == NULL) {
assert(p->numOfDownstream == 0);
}
p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfDownstream + 1));
p->pDownstream[p->numOfDownstream++] = pUpstream;
p->pDownstream[p->numOfDownstream++] = pDownstream;
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
......@@ -5599,11 +5599,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
return pOrderColumns;
}
static void destroySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortMergeOperatorInfo *pInfo = (SSortMergeOperatorInfo*) param;
destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->groupColumnList);
tfree(pInfo->currentGroupColData);
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
}
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -5624,153 +5620,6 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) {
return p;
}
static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortMergeOperatorInfo* pInfo = pOperator->info;
SMsortComparParam resultParam = {.orderInfo = pInfo->orderInfo};
SArray* pSource = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < pInfo->numOfSources; ++i) {
SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup);
// doAddNewOperatorSource(pSource, pBlock, pInfo->binfo.capacity);
}
// sortComparInit(&resultParam, pSource, 0, pInfo->numOfSources - 1, pOperator);
// int32_t code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn);
// if (code != TSDB_CODE_SUCCESS) {
// longjmp(pTaskInfo->env, code);
// }
return NULL;
}
SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, void* param, SArray* pUdfInfo, SExecTaskInfo* pTaskInfo) {
SSortMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortMergeOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
// pInfo->resultRowFactor =
// (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
int32_t numOfOutput = taosArrayGetSize(pExprInfo);
pInfo->binfo.capacity = 4096;
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv( pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize);
int32_t numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0;
// pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
// for(int32_t i = 0; i < numOfCols; ++i) {
// pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset;
//
// SColIndex* index = taosArrayGet(pInfo->groupColumnList, i);
// offset += pExpr[index->colIndex].base.resSchema.bytes;
// }
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
int32_t numOfRows = 1;
// setDefaultOutputBuf_rv(pExprInfo, numOfRows);
pOperator->name = "SortMerge";
pOperator->operatorType = OP_SortMerge;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSortedMerge;
pOperator->cleanupFn = destroySortMergeOperatorInfo;
appendDownstream(pOperator, downstream);
return pOperator;
}
int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
SMsortComparParam *pParam = (SMsortComparParam *)param;
SArray *pInfo = pParam->orderInfo;
SExternalMemSource* pLeftSource = pParam->pSources[pLeftIdx];
SExternalMemSource* pRightSource = pParam->pSources[pRightIdx];
// this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) {
return 1;
}
if (pRightSource->src.rowIndex == -1) {
return -1;
}
SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
SSDataBlock* pRightBlock = pRightSource->src.pBlock;
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex);
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
}
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
}
if (leftNull && rightNull) {
continue; // continue to next slot
}
if (rightNull) {
return pParam->nullFirst? 1:-1;
}
if (leftNull) {
return pParam->nullFirst? -1:1;
}
void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftv = *(int32_t*)left1;
int32_t rightv = *(int32_t*)right1;
if (leftv == rightv) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return leftv < rightv? -1 : 1;
} else {
return leftv < rightv? 1 : -1;
}
}
}
default:
assert(0);
}
}
}
static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHandle) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
......@@ -5787,80 +5636,59 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan
pBlock->info.rows += 1;
}
static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) {
int32_t numOfCols = pDataBlock->info.numOfCols;
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfo.info = p->info;
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
return pBlock;
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SOrderOperatorInfo* pInfo, int32_t capacity) {
blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol);
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) {
blockDataClearup(pDataBlock, hasVarCol);
while(1) {
STupleHandle* pTupleHandle = sortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
STupleHandle* pTupleHandle = sortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(pInfo->pDataBlock, pTupleHandle);
if (pInfo->pDataBlock->info.rows >= capacity) {
return pInfo->pDataBlock;
appendOneRowToDataBlock(pDataBlock, pTupleHandle);
if (pDataBlock->info.rows >= capacity) {
return pDataBlock;
}
}
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
return (pDataBlock->info.rows > 0)? pDataBlock:NULL;
}
static SSDataBlock* loadNextDataBlock(void* param) {
bool newgroup = false;
SOperatorInfo* pOperator = (SOperatorInfo*) param;
return pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], &newgroup);
SSDataBlock* loadNextDataBlock(void* param) {
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOrderOperatorInfo* pInfo = pOperator->info;
SSortedMergeOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
}
int64_t st = taosGetTimestampUs();
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize,
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
tfree(p);
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
setComparFn(pInfo->pSortHandle, msortComparFn);
sortAddSource(pInfo->pSortHandle, pOperator);
for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
sortAddSource(pInfo->pSortHandle, pOperator->pDownstream[i]);
}
// TODO set error code;
int32_t code = sortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
}
static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
......@@ -5886,7 +5714,77 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) {
return pOrderInfo;
}
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal) {
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, void* param, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) {
SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
tfree(pInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
// pInfo->resultRowFactor =
// (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo->sortBufSize = 1024 * 16; // 1MB
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes);
pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal);
int32_t numOfRows = 1;
// setDefaultOutputBuf_rv(pExprInfo, numOfRows);
pOperator->name = "SortedMerge";
pOperator->operatorType = OP_SortedMerge;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSortedMerge;
pOperator->cleanupFn = destroySortedMergeOperatorInfo;
for(int32_t i = 0; i < numOfDownstream; ++i) {
appendDownstream(pOperator, downstream[i]);
}
return pOperator;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOrderOperatorInfo* pInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
}
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize,
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
tfree(p);
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
sortAddSource(pInfo->pSortHandle, pOperator);
// TODO set error code;
int32_t code = sortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
}
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5925,6 +5823,8 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSort;
pOperator->cleanupFn = destroyOrderOperatorInfo;
......
......@@ -23,13 +23,6 @@
#include "tpagedbuf.h"
#include "tutil.h"
typedef struct SMsortComparParam {
void **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
typedef struct STupleHandle {
SSDataBlock* pBlock;
int32_t rowIndex;
......@@ -72,7 +65,9 @@ typedef struct SSortHandle {
STupleHandle tupleHandle;
} SSortHandle;
SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pBlock->info.numOfCols = numOfCols;
......@@ -107,6 +102,7 @@ SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type,
pSortHandle->cmpParam.orderInfo = pOrderInfo;
pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols);
setComparFn(pSortHandle, msortComparFn);
if (idstr != NULL) {
pSortHandle->idStr = strdup(idstr);
......@@ -365,7 +361,7 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam*
return (pHandle->pDataBlock->info.rows > 0)? pHandle->pDataBlock:NULL;
}
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
......
......@@ -298,7 +298,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(1500), pExprInfo, pOrderVal);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(1500), pExprInfo, pOrderVal, NULL);
bool newgroup = false;
SSDataBlock* pRes = NULL;
......
......@@ -238,7 +238,6 @@ TEST(testCase, ordered_merge_sort_Test) {
setFetchRawDataFp(phandle, getSingleColDummyBlock);
setComparFn(phandle, docomp);
for(int32_t i = 0; i < 10; ++i) {
SOperatorSource* p = static_cast<SOperatorSource*>(calloc(1, sizeof(SOperatorSource)));
_info* c = static_cast<_info*>(calloc(1, sizeof(_info)));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册