提交 4b041a8b 编写于 作者: H Haojun Liao

[td-13039] support order by.

上级 bee71f5f
......@@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo {
} SMultiwayMergeInfo;
// todo support the disk-based sort
typedef struct SOrderOperatorInfo {
typedef struct SSortOperatorInfo {
int32_t colIndex;
int32_t order;
SSDataBlock *pDataBlock;
} SOrderOperatorInfo;
} SSortOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
......@@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SOperatorInfo* createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
case OP_Order: {
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
pRuntimeEnv->proot = createSortOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
break;
}
......@@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return NULL;
}
SOrderOperatorInfo* pInfo = pOperator->info;
SSortOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
while(1) {
......@@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
SOperatorInfo *createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
SSortOperatorInfo* pInfo = calloc(1, sizeof(SSortOperatorInfo));
{
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
......@@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
}
......
......@@ -29,8 +29,9 @@ typedef struct SCorEpSet {
} SCorEpSet;
typedef struct SBlockOrderInfo {
bool nullFirst;
int32_t order;
int32_t colIndex;
int32_t slotId;
SColumnInfoData* pColData;
} SBlockOrderInfo;
......@@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
......
......@@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
typedef struct SSDataBlockSortHelper {
SArray* orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock* pDataBlock;
bool nullFirst;
} SSDataBlockSortHelper;
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
......@@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
}
if (rightNull) {
return pHelper->nullFirst ? 1 : -1;
return pOrder->nullFirst ? 1 : -1;
}
if (leftNull) {
return pHelper->nullFirst ? -1 : 1;
return pOrder->nullFirst ? -1 : 1;
}
}
......@@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) {
}
}
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) {
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
ASSERT(pDataBlock != NULL && pOrderInfo != NULL);
if (pDataBlock->info.rows <= 1) {
return TSDB_CODE_SUCCESS;
......@@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex);
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
if (pColInfoData->hasNull) {
sortColumnHasNull = true;
}
......@@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
int64_t p0 = taosGetTimestampUs();
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex);
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
}
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
......@@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
for (int32_t i = 0; i < numOfCols; ++i) {
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId);
pInfo->pColData = pColInfo;
sortValLengthPerRow += pColInfo->info.bytes;
}
......@@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// Allocate the additional buffer.
int64_t p0 = taosGetTimestampUs();
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
uint32_t rows = pDataBlock->info.rows;
SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock);
......
......@@ -593,8 +593,7 @@ typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo;
bool hasVarCol;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
......@@ -613,12 +612,10 @@ typedef struct SSortedMergeOperatorInfo {
SAggSupporter aggSup;
} SSortedMergeOperatorInfo;
typedef struct SOrderOperatorInfo {
typedef struct SSortOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock;
bool hasVarCol; // has variable length column, such as binary/varchar/nchar
SArray *orderInfo;
bool nullFirst;
SArray* pSortInfo;
SSortHandle *pSortHandle;
int32_t bufPageSize;
int32_t numOfRowsInRes;
......@@ -629,7 +626,7 @@ typedef struct SOrderOperatorInfo {
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo;
} SSortOperatorInfo;
typedef struct SDistinctDataInfo {
int32_t index;
......@@ -655,8 +652,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
......
......@@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @param type
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr);
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
/**
*
......
......@@ -19,8 +19,6 @@
#include "executil.h"
#include "executorimpl.h"
//#include "queryLog.h"
#include "tbuffer.h"
#include "tcompression.h"
#include "tlosertree.h"
......
......@@ -322,26 +322,6 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, STaskRuntimeEn
}
//setup the output buffer for each operator
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
const static int32_t minSize = 8;
SSDataBlock *res = taosMemoryCalloc(1, sizeof(SSDataBlock));
res->info.numOfCols = numOfOutput;
res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData idata = {{0}};
idata.info.type = pExpr[i].base.resSchema.type;
idata.info.bytes = pExpr[i].base.resSchema.bytes;
idata.info.colId = pExpr[i].base.resSchema.colId;
int32_t size = TMAX(idata.info.bytes * numOfRows, minSize);
idata.pData = taosMemoryCalloc(1, size); // at least to hold a pointer on x64 platform
taosArrayPush(res->pDataBlock, &idata);
}
return res;
}
SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
......@@ -5840,14 +5820,14 @@ static void cleanupAggSup(SAggSupporter* pAggSup);
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param;
taosArrayDestroy(pInfo->orderInfo);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->groupInfo);
if (pInfo->pSortHandle != NULL) {
tsortDestroySortHandle(pInfo->pSortHandle);
}
blockDataDestroy(pInfo->binfo.pRes);
blockDataDestroy(pInfo->binfo.pRes);
cleanupAggSup(&pInfo->aggSup);
}
......@@ -6105,12 +6085,10 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity);
}
SSchema* p = blockDataExtractSchema(pInfo->binfo.pRes, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
numOfBufPage, p, pInfo->binfo.pRes->info.numOfCols, "GET_TASKID(pTaskInfo)");
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
taosMemoryFreeClear(p);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
......@@ -6128,29 +6106,6 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) {
return doMerge(pOperator);
}
static SArray* createBlockOrder(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal) {
SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
size_t numOfOrder = taosArrayGetSize(pOrderVal);
for (int32_t j = 0; j < numOfOrder; ++j) {
SBlockOrderInfo orderInfo = {0};
SOrder* pOrder = taosArrayGet(pOrderVal, j);
orderInfo.order = pOrder->order;
for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pExprInfo[i];
if (pExpr->base.resSchema.colId == pOrder->col.colId) {
orderInfo.colIndex = i;
break;
}
}
taosArrayPush(pOrderInfo, &orderInfo);
}
return pOrderInfo;
}
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) {
if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
return 0;
......@@ -6199,7 +6154,7 @@ static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGr
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) {
SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -6228,7 +6183,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
// pRuntimeEnv->pQueryAttr->topBotQuery, false));
pInfo->sortBufSize = 1024 * 16; // 1MB
pInfo->bufPageSize = 1024;
pInfo->orderInfo = createBlockOrder(pExprInfo, num, pOrderVal);
pInfo->pSortInfo = pSortInfo;
pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
......@@ -6268,35 +6223,34 @@ static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) {
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOrderOperatorInfo* pInfo = pOperator->info;
SSortOperatorInfo* pInfo = pOperator->info;
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
if (pOperator->status == OP_RES_TO_RETURN) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
}
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize,
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize,
numOfBufPage, pInfo->pDataBlock, "GET_TASKID(pTaskInfo)");
taosMemoryFreeClear(p);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource));
ps->param = pOperator;
ps->param = pOperator->pDownstream[0];
tsortAddSource(pInfo->pSortHandle, ps);
// TODO set error code;
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes);
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
}
SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) {
SOrderOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SOrderOperatorInfo));
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo);
......@@ -6305,37 +6259,21 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
return NULL;
}
pInfo->sortBufSize = 1024 * 16; // 1MB
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->orderInfo = createBlockOrder(pExprInfo, numOfCols, pOrderVal);
for(int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(pExprInfo[i].base.resSchema.type)) {
pInfo->hasVarCol = true;
break;
}
}
pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResBlock;
pInfo->pSortInfo = pSortInfo;
if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) {
taosMemoryFreeClear(pOperator);
destroyOrderOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pInfo);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pOperator->name = "Order";
pOperator->name = "Sort";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->getNextFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo;
pOperator->getNextFn = doSort;
pOperator->closeFn = destroyOrderOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
......@@ -6496,7 +6434,6 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l
offset += valueLen;
initResultRow(resultRow);
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
}
......@@ -7505,10 +7442,10 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
taosArrayDestroy(pInfo->orderInfo);
taosArrayDestroy(pInfo->pSortInfo);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -8035,7 +7972,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
offset += pExpr[index->colIndex].base.resSchema.bytes;
}
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity);
// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity);
pOperator->name = "SLimitOperator";
......@@ -8328,7 +8265,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato
pInfo->outputCapacity = 4096;
pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo));
pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -8536,8 +8473,9 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* extractColMatchInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
......@@ -8623,16 +8561,37 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
//todo: set the correct primary timestamp key column
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding,
SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit, .offset = pIntervalPhyNode->offset};
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset};
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
}
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
int32_t num = 0;
SExprInfo *pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &num);
SSDataBlock *pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SArray *info = createSortInfo(pSortPhyNode->pSortKeys);
return createSortOperatorInfo(op, pExprInfo, num, pResBlock, info, pTaskInfo);
}
} else {
ASSERT(0);
}/*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
assert(size == 1);
......@@ -8725,6 +8684,38 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return pList;
}
SArray* createSortInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return pList;
}
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i);
SOrderByExprNode* pSortKey = (SOrderByExprNode*) pNode->pExpr;
SBlockOrderInfo bi = {0};
bi.order = (pSortKey->order == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC;
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
bi.slotId = pColNode->slotId;
// pColNode->order;
// SColumn c = {0};
// c.slotId = pColNode->slotId;
// c.colId = pColNode->colId;
// c.type = pColNode->node.resType.type;
// c.bytes = pColNode->node.resType.bytes;
// c.precision = pColNode->node.resType.precision;
// c.scale = pColNode->node.resType.scale;
taosArrayPush(pList, &bi);
}
return pList;
}
SArray* extractColMatchInfo(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
......
......@@ -23,20 +23,19 @@
#include "tsort.h"
#include "tutil.h"
typedef struct STupleHandle {
struct STupleHandle {
SSDataBlock* pBlock;
int32_t rowIndex;
} STupleHandle;
};
typedef struct SSortHandle {
struct SSortHandle {
int32_t type;
int32_t pageSize;
int32_t numOfPages;
SDiskbasedBuf *pBuf;
SArray *pOrderInfo;
bool nullFirst;
SArray *pSortInfo;
SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp;
......@@ -60,7 +59,7 @@ typedef struct SSortHandle {
bool inMemSort;
bool needAdjust;
STupleHandle tupleHandle;
} SSortHandle;
};
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
......@@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
* @param type
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) {
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->pOrderInfo = pOrderInfo;
pSortHandle->nullFirst = nullFirst;
pSortHandle->cmpParam.orderInfo = pOrderInfo;
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pDataBlock = createOneDataBlock(pBlock);
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols);
tsortSetComparFp(pSortHandle, msortComparFn);
if (idstr != NULL) {
......@@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
bool leftNull = false;
if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
}
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false;
if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
......@@ -542,7 +541,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst);
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
......@@ -555,7 +554,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst);
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize) {
......
......@@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false;
SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup);
......@@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL);
SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false;
SSDataBlock* pRes = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册