提交 d6fa853b 编写于 作者: H Haojun Liao

[td-11818] refactor.

上级 36aedfbb
......@@ -47,5 +47,6 @@ OP_ENUM_MACRO(AllTimeWindow)
OP_ENUM_MACRO(AllMultiTableTimeInterval)
OP_ENUM_MACRO(Order)
OP_ENUM_MACRO(Exchange)
OP_ENUM_MACRO(SortMerge)
//OP_ENUM_MACRO(TableScan)
......@@ -549,27 +549,19 @@ typedef struct SDistinctOperatorInfo {
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
struct SGlobalMerger;
typedef struct SMultiwayMergeInfo {
struct SGlobalMerger* pMerge;
SOptrBasicInfo binfo;
int32_t bufCapacity;
int64_t seed;
char** prevRow;
SArray* orderColumnList;
int32_t resultRowFactor;
bool hasGroupColData;
char** currentGroupColData;
SArray* groupColumnList;
bool hasDataBlockForNewGroup;
SSDataBlock* pExistBlock;
SArray* udfInfo;
bool hasPrev;
bool multiGroupResults;
} SMultiwayMergeInfo;
typedef struct SSortMergeOperatorInfo {
SOptrBasicInfo binfo;
SArray *orderInfo; // SArray<SBlockOrderInfo>
SArray* groupColumnList;
bool hasDataBlockForNewGroup;
char** currentGroupColData;
SArray* udfInfo;
int32_t numOfSources;
// char** prevRow;
// int32_t resultRowFactor;
// bool multiGroupResults;
// bool hasGroupColData;
} SSortMergeOperatorInfo;
typedef struct SMsortComparParam {
struct SExternalMemSource **pSources;
......@@ -592,6 +584,7 @@ typedef struct SOrderOperatorInfo {
SMsortComparParam cmpParam;
// TODO extact struct
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t totalSize; // total load bytes from remote
......@@ -642,8 +635,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal);
SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput,
SOrder* pOrderVal);
SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, void* param, SArray* pUdfInfo, SExecTaskInfo* pTaskInfo);
// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -5600,12 +5600,9 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
}
static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param;
SSortMergeOperatorInfo *pInfo = (SSortMergeOperatorInfo*) param;
destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->orderColumnList);
taosArrayDestroy(pInfo->groupColumnList);
tfree(pInfo->prevRow);
tfree(pInfo->currentGroupColData);
}
......@@ -5616,118 +5613,82 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevRow);
}
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->resultRowFactor =
(int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
pInfo->multiGroupResults = groupResultMixedUp;
pInfo->pMerge = param;
pInfo->bufCapacity = 4096;
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
// TODO refactor
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
// len += pExpr[i].base.;
static SExprInfo* exprArrayDup(SArray* pExprInfo) {
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
assignExprInfo(&p[i], pExpr);
}
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
return p;
}
SColIndex* index = taosArrayGet(pInfo->orderColumnList, i);
offset += pExpr[index->colIndex].base.resSchema.bytes;
static SSDataBlock* doSortMerge(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0;
pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len));
offset = POINTER_BYTES * numOfCols;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortMergeOperatorInfo* pInfo = pOperator->info;
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset;
for(int32_t i = 0; i < pInfo->numOfSources; ++i) {
SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup);
SColIndex* index = taosArrayGet(pInfo->groupColumnList, i);
offset += pExpr[index->colIndex].base.resSchema.bytes;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->seed = rand();
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE);
return NULL;
}
SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, void* param, SArray* pUdfInfo, SExecTaskInfo* pTaskInfo) {
SSortMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortMergeOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GlobalAggregate";
// pOperator->operatorType = OP_GlobalAggregate;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
// pOperator->exec = doGlobalAggregate;
pOperator->cleanupFn = destroyGlobalAggOperatorInfo;
appendDownstream(pOperator, downstream);
return pOperator;
}
SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
int32_t numOfRows, void *merger) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
// pInfo->resultRowFactor =
// (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo->pMerge = merger;
pInfo->bufCapacity = numOfRows;
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
int32_t numOfOutput = taosArrayGetSize(pExprInfo);
pInfo->binfo.capacity = 4096;
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
pInfo->binfo.pCtx = createSqlFunctionCtx_rv( pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize);
{ // todo extract method to create prev compare buffer
int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) {
// len += pExpr[i].base.colBytes;
}
int32_t numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0;
// pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
// for(int32_t i = 0; i < numOfCols; ++i) {
// pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset;
//
// SColIndex* index = taosArrayGet(pInfo->groupColumnList, i);
// offset += pExpr[index->colIndex].base.resSchema.bytes;
// }
int32_t offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SColIndex* index = taosArrayGet(pInfo->orderColumnList, i);
// offset += pExpr[index->colIndex].base.colBytes;
}
}
int32_t numOfRows = 1;
// setDefaultOutputBuf_rv(pExprInfo, numOfRows);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiwaySortOperator";
// pOperator->operatorType = OP_MultiwayMergeSort;
pOperator->blockingOptr = false;
pOperator->name = "SortMerge";
pOperator->operatorType = OP_SortMerge;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->pExpr = exprArrayDup(pExprInfo);
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr;
// pOperator->exec = doMultiwayMergeSort;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSortMerge;
pOperator->cleanupFn = destroyGlobalAggOperatorInfo;
appendDownstream(pOperator, downstream);
return pOperator;
}
typedef struct SExternalMemSource {
SArray* pageIdList;
int32_t pageIndex;
int32_t sourceId;
int32_t rowIndex;
SSDataBlock *pBlock;
} SExternalMemSource;
......@@ -5879,8 +5840,6 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, in
}
pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId);
pSource->sourceId = pInfo->sourceId;
pSource->pBlock = calloc(1, sizeof(SSDataBlock));
pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
pSource->pBlock->info.numOfCols = numOfCols;
......@@ -7197,17 +7156,6 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n
return TSDB_CODE_SUCCESS;
}
static SExprInfo* exprArrayDup(SArray* pExprInfo) {
size_t numOfOutput = taosArrayGetSize(pExprInfo);
SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo));
for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
assignExprInfo(&p[i], pExpr);
}
return p;
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
......@@ -8198,12 +8146,12 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
// Transfer the Array of STableKeyInfo into uid list.
size_t numOfTables = taosArrayGetSize(pa);
idList = taosArrayInit(numOfTables, sizeof(uint64_t));
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pa, i);
taosArrayPush(idList, &pkeyInfo->uid);
}
idList = taosArrayInit(numOfTables, sizeof(uint64_t));
} else {
idList = taosArrayInit(4, sizeof(uint64_t));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册