未验证 提交 0b798123 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21836 from wangjiaming0909/feature/3.0/optimize_order_limit

feature: optimize order by limit using priority queue
......@@ -187,6 +187,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx);
void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
......
......@@ -184,6 +184,7 @@ extern int64_t tsStreamBufferSize;
extern int64_t tsCheckpointInterval;
extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
......@@ -246,6 +246,7 @@ typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
bool groupSort;
int64_t maxRows;
} SSortLogicNode;
typedef struct SPartitionLogicNode {
......@@ -523,6 +524,7 @@ typedef struct SSortPhysiNode {
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList* pTargets;
int64_t maxRows;
} SSortPhysiNode;
typedef SSortPhysiNode SGroupSortPhysiNode;
......
......@@ -17,6 +17,7 @@
#define _TD_UTIL_HEAP_H_
#include "os.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
......@@ -58,6 +59,48 @@ void heapDequeue(Heap* heap);
size_t heapSize(Heap* heap);
typedef bool (*pq_comp_fn)(void* l, void* r, void* param);
typedef struct PriorityQueueNode {
void* data;
} PriorityQueueNode;
typedef struct PriorityQueue PriorityQueue;
PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param);
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn);
void destroyPriorityQueue(PriorityQueue* pq);
PriorityQueueNode* taosPQTop(PriorityQueue* pq);
size_t taosPQSize(PriorityQueue* pq);
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);
void taosPQPop(PriorityQueue* pq);
typedef struct BoundedQueue BoundedQueue;
BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param);
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn);
void destroyBoundedQueue(BoundedQueue* q);
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n);
PriorityQueueNode* taosBQTop(BoundedQueue* q);
size_t taosBQSize(BoundedQueue* q);
size_t taosBQMaxSize(BoundedQueue* q);
void taosBQBuildHeap(BoundedQueue* q);
void taosBQPop(BoundedQueue* q);
#ifdef __cplusplus
}
#endif
......
......@@ -47,6 +47,17 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo
}
}
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0;
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
else
return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
}
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
......
......@@ -62,6 +62,7 @@ int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1;
int32_t tsMaxStreamBackendCache = 128; // M
int32_t tsPQSortMemThreshold = 16; // M
// sync raft
int32_t tsElectInterval = 25 * 1000;
......@@ -533,6 +534,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
......@@ -914,6 +916,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32;
tsPQSortMemThreshold = cfgGetItem(pCfg, "pqSortMemThreshold")->i32;
GRANT_CFG_GET;
return 0;
......
......@@ -75,10 +75,11 @@ typedef struct SResultRowInfo {
} SResultRowInfo;
typedef struct SColMatchItem {
int32_t colId;
int32_t srcSlotId;
int32_t dstSlotId;
bool needOutput;
int32_t colId;
int32_t srcSlotId;
int32_t dstSlotId;
bool needOutput;
SDataType dataType;
} SColMatchItem;
typedef struct SColMatchInfo {
......
......@@ -64,10 +64,14 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
/**
*
* @param type
* @param maxRows keep maxRows at most
* @param maxTupleLength max len of one tuple, for check if heap sort is applicable
* @param sortBufSize sort memory buf size, for check if heap sort is applicable
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr);
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize);
/**
*
......
......@@ -1305,6 +1305,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.dstSlotId = pNode->slotId;
c.dataType = pColNode->node.resType;
taosArrayPush(pList, &c);
}
}
......
......@@ -2801,7 +2801,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str);
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
......
......@@ -29,6 +29,8 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
uint64_t maxTupleLength;
int64_t maxRows;
} SSortOperatorInfo;
static SSDataBlock* doSort(SOperatorInfo* pOperator);
......@@ -36,6 +38,7 @@ static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
static void destroySortOperatorInfo(void* param);
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
// todo add limit/offset impl
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
......@@ -51,6 +54,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
int32_t numOfCols = 0;
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
pOperator->exprSupp.numOfExprs = numOfCols;
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = pSortNode->maxRows;
int32_t numOfOutputCols = 0;
int32_t code =
......@@ -193,9 +198,9 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
}
pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str,
pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
......@@ -286,6 +291,20 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
return TSDB_CODE_SUCCESS;
}
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
size_t size = taosArrayGetSize(pColItem->pList);
for (size_t i = 0; i < size; ++i) {
pSortOperInfo->maxTupleLength += ((SColMatchItem*)taosArrayGet(pColItem->pList, i))->dataType.bytes;
}
size = LIST_LENGTH(pSortKeys);
for (size_t i = 0; i < size; ++i) {
SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
}
return TSDB_CODE_SUCCESS;
}
//=====================================================================================
// Group Sort Operator
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
......@@ -384,7 +403,7 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pCurrSortHandle =
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
......@@ -582,7 +601,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str);
pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);
......
......@@ -19,6 +19,7 @@
#include "tcompare.h"
#include "tdatablock.h"
#include "tdef.h"
#include "theap.h"
#include "tlosertree.h"
#include "tpagedbuf.h"
#include "tsort.h"
......@@ -41,6 +42,12 @@ struct SSortHandle {
int64_t startTs;
uint64_t totalElapsed;
uint64_t maxRows;
uint32_t maxTupleLength;
uint32_t sortBufSize;
BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx;
int32_t sourceId;
SSDataBlock* pDataBlock;
SMsortComparParam cmpParam;
......@@ -61,6 +68,47 @@ struct SSortHandle {
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
// | offset[0] | offset[1] |....| nullbitmap | data |...|
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
return taosMemoryCalloc(1, totalLen);
}
static void destoryTuple(void* t) { taosMemoryFree(t); }
#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum))
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)
/**
* @param t the tuple pointer addr, if realloced, *t is changed to the new addr
* @param offset copy data into pTuple start from offset
* @param colIndex the columnIndex, for setting null bitmap
* @return the next offset to add field
* */
static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length,
bool isNull, uint32_t tupleLen) {
tupleSetOffset(*t, colIdx, offset);
if (isNull) {
tupleSetNull(*t, colIdx, colNum);
} else {
if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
*t = taosMemoryRealloc(*t, offset + length);
}
tupleSetData(*t, offset, data, length);
}
return offset + length;
}
static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
if (tupleColIsNull(t, colIdx, colNum)) return NULL;
return t + *tupleOffset(t, colIdx);
}
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param);
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
return createOneDataBlock(pSortHandle->pDataBlock, false);
}
......@@ -71,7 +119,8 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr) {
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
......@@ -80,6 +129,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->loops = 0;
pSortHandle->maxTupleLength = maxTupleLength;
if (maxRows < 0)
pSortHandle->sortBufSize = 0;
else
pSortHandle->sortBufSize = sortBufSize;
pSortHandle->maxRows = maxRows;
if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
......@@ -150,7 +206,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
if (pSortHandle == NULL) {
return;
}
tsortClose(pSortHandle);
if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(&pSortHandle->pMergeTree);
......@@ -159,6 +214,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
destroyDiskbasedBuf(pSortHandle->pBuf);
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
int64_t fetchUs = 0, fetchNum = 0;
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
......@@ -769,17 +825,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code;
}
int32_t tsortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t code = createInitialSources(pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -840,7 +886,7 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
return TSDB_CODE_SUCCESS;
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) {
if (tsortIsClosed(pHandle)) {
return NULL;
}
......@@ -890,6 +936,168 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
return &pHandle->tupleHandle;
}
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->maxRows;
}
static bool tsortPQCompFn(void* a, void* b, void* param) {
SSortHandle* pHandle = param;
int32_t res = pHandle->comparFn(a, b, param);
if (res < 0) return 1;
return 0;
}
static bool tsortPQComFnReverse(void*a, void* b, void* param) {
SSortHandle* pHandle = param;
int32_t res = pHandle->comparFn(a, b, param);
if (res > 0) return 1;
return 0;
}
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) {
char* pLTuple = (char*)pLeft;
char* pRTuple = (char*)pRight;
SSortHandle* pHandle = (SSortHandle*)param;
SArray* orderInfo = (SArray*)pHandle->pSortInfo;
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
for (int32_t i = 0; i < orderInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum);
void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum);
if (!lData && !rData) continue;
if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1;
int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
__compar_fn_t fn = getKeyComparFunc(type, pOrder->order);
int ret = fn(lData, rData);
if (ret == 0) {
continue;
} else {
return ret;
}
}
return 0;
}
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, colDataComparFn);
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
pHandle->pDataBlock = NULL;
uint32_t tupleLen = 0;
PriorityQueueNode pqNode;
while (1) {
// fetch data
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (NULL == pBlock) break;
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY;
size_t colNum = blockDataGetNumOfCols(pBlock);
if (tupleLen == 0) {
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
tupleLen += pCol->info.bytes;
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
tupleLen += sizeof(VarDataLenT);
}
}
}
size_t colLen = 0;
for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
void* pTuple = createTuple(colNum, tupleLen);
if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY;
uint32_t offset = tupleGetDataStartOffset(colNum);
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
} else {
colLen = colDataGetRowLength(pCol, rowIdx);
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
tupleLen);
}
}
pqNode.data = pTuple;
taosBQPush(pHandle->pBoundedQueue, &pqNode);
}
}
return TSDB_CODE_SUCCESS;
}
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
blockDataCleanup(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
// abondan the top tuple if queue size bigger than max size
if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
taosBQPop(pHandle->pBoundedQueue);
}
if (pHandle->tmpRowIdx == 0) {
// sort the results
taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
taosBQBuildHeap(pHandle->pBoundedQueue);
}
if (taosBQSize(pHandle->pBoundedQueue) > 0) {
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
char* pTuple = (char*)node->data;
for (uint32_t i = 0; i < colNum; ++i) {
void* pData = tupleGetField(pTuple, i, colNum);
if (!pData) {
colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0);
} else {
colDataAppend(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false);
}
}
pHandle->pDataBlock->info.rows++;
pHandle->tmpRowIdx++;
taosBQPop(pHandle->pBoundedQueue);
}
if (pHandle->pDataBlock->info.rows == 0) return NULL;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return &pHandle->tupleHandle;
}
int32_t tsortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
if (tsortIsPQSortApplicable(pHandle))
return tsortOpenForPQSort(pHandle);
else
return tsortOpenForBufMergeSort(pHandle);
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
if (pHandle->pBoundedQueue)
return tsortPQSortNextTuple(pHandle);
else
return tsortBufMergeSortNextTuple(pHandle);
}
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
......
......@@ -502,6 +502,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pSortKeys);
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(maxRows);
return TSDB_CODE_SUCCESS;
}
......
......@@ -2100,6 +2100,7 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
static const char* jkSortPhysiPlanExprs = "Exprs";
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
static const char* jkSortPhysiPlanTargets = "Targets";
static const char* jkSortPhysiPlanMaxRows = "MaxRows";
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
......@@ -2114,6 +2115,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows);
}
return code;
}
......@@ -2131,6 +2135,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows);
}
return code;
}
......
......@@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS };
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
......@@ -2609,6 +2609,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows);
}
return code;
}
......@@ -2632,6 +2635,9 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_SORT_CODE_MAX_ROWS:
code = tlvDecodeI64(pTlv, &pNode->maxRows);
break;
default:
break;
}
......
......@@ -1027,6 +1027,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = -1;
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
......@@ -1298,6 +1299,7 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = -1;
TSWAP(pSort->node.pLimit, pSetOperator->pLimit);
int32_t code = TSDB_CODE_SUCCESS;
......
......@@ -123,7 +123,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode
pNode->inputTsOrder = order;
switch (nodeType(pNode)) {
// for those nodes that will change the order, stop propagating
//case QUERY_NODE_LOGIC_PLAN_WINDOW:
// case QUERY_NODE_LOGIC_PLAN_WINDOW:
case QUERY_NODE_LOGIC_PLAN_JOIN:
case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_SORT:
......@@ -769,8 +769,9 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond)
}
SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft);
SColumnNode* pRight = (SColumnNode*)(pOper->pRight);
//TODO: add cast to operator and remove this restriction of optimization
if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) {
// TODO: add cast to operator and remove this restriction of optimization
if (pLeft->node.resType.type != pRight->node.resType.type ||
pLeft->node.resType.bytes != pRight->node.resType.bytes) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
......@@ -2575,7 +2576,7 @@ static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode);
if (NULL != pNode) {
//TODO: only set the slimit now. push down slimit later
// TODO: only set the slimit now. push down slimit later
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset;
((SLimitNode*)pTableScanNode->pSlimit)->offset = 0;
......@@ -2629,8 +2630,16 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit);
// if we have pushed down, we skip it
if ((*(SSortLogicNode*)pChild).maxRows != -1) return false;
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
return false;
}
return true;
......@@ -2644,8 +2653,18 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
nodesDestroyNode(pChild->pLimit);
pChild->pLimit = pNode->pLimit;
pNode->pLimit = NULL;
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit;
int64_t maxRows = -1;
if (pLimitNode->limit != -1) {
maxRows = pLimitNode->limit;
if (pLimitNode->offset != -1) maxRows += pLimitNode->offset;
}
((SSortLogicNode*)pChild)->maxRows = maxRows;
} else {
pChild->pLimit = pNode->pLimit;
pNode->pLimit = NULL;
}
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
......@@ -2898,7 +2917,7 @@ static SSortLogicNode* sortNonPriKeySatisfied(SLogicNode* pNode) {
if (sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys)) {
return NULL;
}
SNode* pSortKeyNode = NULL, *pSortKeyExpr = NULL;
SNode *pSortKeyNode = NULL, *pSortKeyExpr = NULL;
FOREACH(pSortKeyNode, pSort->pSortKeys) {
pSortKeyExpr = ((SOrderByExprNode*)pSortKeyNode)->pExpr;
switch (nodeType(pSortKeyExpr)) {
......@@ -2931,7 +2950,7 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
optFindEligibleNode(pLogicSubplan->pNode, sortNonPriKeyShouldOptimize, pNodeList);
SNode* pNode = NULL;
FOREACH(pNode, pNodeList) {
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0);
pSort->node.outputTsOrder = pOrderByExpr->order;
optSetParentOrder(pSort->node.pParent, pOrderByExpr->order, NULL);
......
......@@ -1374,6 +1374,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = pSortLogicNode->maxRows;
SNodeList* pPrecalcExprs = NULL;
SNodeList* pSortKeys = NULL;
......
......@@ -1018,6 +1018,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
splSetParent((SLogicNode*)pPartSort);
pPartSort->pSortKeys = pSortKeys;
pPartSort->groupSort = pSort->groupSort;
pPartSort->maxRows = pSort->maxRows;
code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
}
......
......@@ -187,3 +187,172 @@ void heapRemove(Heap* heap, HeapNode* node) {
}
void heapDequeue(Heap* heap) { heapRemove(heap, heap->min); }
struct PriorityQueue {
SArray* container;
pq_comp_fn fn;
FDelete deleteFn;
void* param;
};
PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param) {
PriorityQueue* pq = (PriorityQueue*)taosMemoryCalloc(1, sizeof(PriorityQueue));
pq->container = taosArrayInit(1, sizeof(PriorityQueueNode));
pq->fn = fn;
pq->deleteFn = deleteFn;
pq->param = param;
return pq;
}
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) {
pq->fn = fn;
}
void destroyPriorityQueue(PriorityQueue* pq) {
if (pq->deleteFn)
taosArrayDestroyP(pq->container, pq->deleteFn);
else
taosArrayDestroy(pq->container);
taosMemoryFree(pq);
}
static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ }
static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ }
static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */}
static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
void * tmp = a->data;
a->data = b->data;
b->data = tmp;
}
#define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i)))
#define pqContainerSize(pq) (taosArrayGetSize((pq)->container))
size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); }
static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) {
size_t largest = from;
do {
from = largest;
size_t l = pqLeft(from);
size_t r = pqRight(from);
if (l < last && pq->fn(pqContainerGetEle(pq, from)->data, pqContainerGetEle(pq, l)->data, pq->param)) {
largest = l;
}
if (r < last && pq->fn(pqContainerGetEle(pq, largest)->data, pqContainerGetEle(pq, r)->data, pq->param)) {
largest = r;
}
if (largest != from) {
pqSwapPQNode(pqContainerGetEle(pq, from), pqContainerGetEle(pq, largest));
}
} while (largest != from);
}
static void pqBuildHeap(PriorityQueue* pq) {
if (pqContainerSize(pq) > 1) {
for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) {
pqHeapify(pq, i, pqContainerSize(pq));
}
pqHeapify(pq, 0, pqContainerSize(pq));
}
}
static void pqReverseHeapify(PriorityQueue* pq, size_t i) {
while (i > 0 && !pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
size_t parentIdx = pqParent(i);
pqSwapPQNode(pqContainerGetEle(pq, i), pqContainerGetEle(pq, parentIdx));
i = parentIdx;
}
}
static void pqUpdate(PriorityQueue* pq, size_t i) {
if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
// if value in pos i is smaller than parent, heapify down from i to the end
pqHeapify(pq, i, pqContainerSize(pq));
} else {
// if value in pos i is big than parent, heapify up from i
pqReverseHeapify(pq, i);
}
}
static void pqRemove(PriorityQueue* pq, size_t i) {
if (i == pqContainerSize(pq) - 1) {
taosArrayPop(pq->container);
return;
}
taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1));
taosArrayPop(pq->container);
pqUpdate(pq, i);
}
PriorityQueueNode* taosPQTop(PriorityQueue* pq) {
return pqContainerGetEle(pq, 0);
}
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) {
taosArrayPush(pq->container, node);
pqReverseHeapify(pq, pqContainerSize(pq) - 1);
}
void taosPQPop(PriorityQueue* pq) {
PriorityQueueNode* top = taosPQTop(pq);
if (pq->deleteFn) pq->deleteFn(top->data);
pqRemove(pq, 0);
}
struct BoundedQueue {
PriorityQueue* queue;
uint32_t maxSize;
};
BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param) {
BoundedQueue* q = (BoundedQueue*)taosMemoryCalloc(1, sizeof(BoundedQueue));
q->queue = createPriorityQueue(fn, deleteFn, param);
taosArrayEnsureCap(q->queue->container, maxSize + 1);
q->maxSize = maxSize;
return q;
}
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) {
taosPQSetFn(q->queue, fn);
}
void destroyBoundedQueue(BoundedQueue* q) {
if (!q) return;
destroyPriorityQueue(q->queue);
taosMemoryFree(q);
}
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
if (pqContainerSize(q->queue) == q->maxSize + 1) {
PriorityQueueNode* top = pqContainerGetEle(q->queue, 0);
void *p = top->data;
top->data = n->data;
n->data = p;
if (q->queue->deleteFn) q->queue->deleteFn(n->data);
pqHeapify(q->queue, 0, taosBQSize(q));
} else {
taosPQPush(q->queue, n);
}
}
PriorityQueueNode* taosBQTop(BoundedQueue* q) {
return taosPQTop(q->queue);
}
void taosBQBuildHeap(BoundedQueue *q) {
pqBuildHeap(q->queue);
}
size_t taosBQMaxSize(BoundedQueue* q) {
return q->maxSize;
}
size_t taosBQSize(BoundedQueue* q) {
return taosPQSize(q->queue);
}
void taosBQPop(BoundedQueue* q) {
taosPQPop(q->queue);
}
......@@ -468,7 +468,7 @@ if $data01 != 1 then
endi
## supertable aggregation + where + interval + group by order by tag + limit offset
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 2 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 2 offset 0
if $rows != 2 then
return -1
endi
......
......@@ -508,7 +508,7 @@ endi
### supertable aggregation + where + interval + group by order by tag + limit offset
## TBASE-345
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0
if $rows != 3 then
return -1
endi
......@@ -554,7 +554,7 @@ if $data09 != 4 then
return -1
endi
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0
if $rows != 3 then
return -1
endi
......
......@@ -126,7 +126,6 @@ endi
if $data10 != 1 then
return -1
endi
sql (select 'ab' as options from union_tb1 limit 1) union all (select 'dd' as options from union_tb0 limit 1) order by options;
if $rows != 2 then
return -1
......
......@@ -2558,3 +2558,243 @@ taos> select a.ts, a.c2, b.c2 from meters as a join (select * from meters order
2022-05-24 00:01:08.000 | 210 | 210 |
2022-05-24 00:01:08.000 | 210 | 210 |
taos> select ts, c2 from meters order by c2;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from meters order by c2 limit 4;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
taos> select ts, c2 from meters order by c2 limit 2,2;
ts | c2 |
========================================
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
taos> select ts, c2 from meters order by ts asc, c2 desc limit 10;
ts | c2 |
========================================
2022-05-15 00:01:08.000 | 234 |
2022-05-15 00:01:08.000 | 234 |
2022-05-16 00:01:08.000 | 136 |
2022-05-16 00:01:08.000 | 136 |
2022-05-17 00:01:08.000 | 59 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from meters order by ts asc, c2 desc limit 5,5;
ts | c2 |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from d1 order by c2;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from d1 order by c2 limit 4;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
taos> select ts, c2 from d1 order by c2 limit 2,2;
ts | c2 |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
taos> select ts, c2 from d1 order by ts asc, c2 desc limit 10;
ts | c2 |
========================================
2022-05-15 00:01:08.000 | 234 |
2022-05-16 00:01:08.000 | 136 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-20 00:01:08.000 | 120 |
2022-05-21 00:01:08.000 | 11 |
2022-05-22 00:01:08.000 | 196 |
2022-05-23 00:01:08.000 | 116 |
2022-05-24 00:01:08.000 | 210 |
taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
ts | c2 |
========================================
2022-05-20 00:01:08.000 | 120 |
2022-05-21 00:01:08.000 | 11 |
2022-05-22 00:01:08.000 | 196 |
2022-05-23 00:01:08.000 | 116 |
2022-05-24 00:01:08.000 | 210 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 |
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 |
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 |
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 |
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 |
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 |
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 |
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 |
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 |
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 |
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 |
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 |
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 |
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 |
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 |
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 |
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 |
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 |
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 |
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 |
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
_wstart | d | avg(c) |
================================================================================
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
last(ts) | d |
========================================
2022-05-19 00:01:08.000 | 243 |
2022-05-15 00:01:08.000 | 234 |
2022-05-24 00:01:08.000 | 210 |
2022-05-22 00:01:08.000 | 196 |
2022-05-16 00:01:08.000 | 136 |
2022-05-20 00:01:08.000 | 120 |
2022-05-23 00:01:08.000 | 116 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8;
last(ts) | d |
========================================
2022-05-24 00:01:08.000 | 210 |
2022-05-22 00:01:08.000 | 196 |
2022-05-16 00:01:08.000 | 136 |
2022-05-20 00:01:08.000 | 120 |
2022-05-23 00:01:08.000 | 116 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1;
last(ts) | d |
========================================
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8;
last(ts) | d |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1;
last(ts) | d |
========================================
2022-05-19 00:01:08.000 | 243 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10;
ts | d |
========================================
2022-05-24 00:01:08.000 | 210 |
2022-05-23 00:01:08.000 | 116 |
2022-05-22 00:01:08.000 | 196 |
2022-05-21 00:01:08.000 | 11 |
2022-05-20 00:01:08.000 | 120 |
2022-05-19 00:01:08.000 | 243 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-16 00:01:08.000 | 136 |
2022-05-15 00:01:08.000 | 234 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8;
ts | d |
========================================
2022-05-22 00:01:08.000 | 196 |
2022-05-21 00:01:08.000 | 11 |
2022-05-20 00:01:08.000 | 120 |
2022-05-19 00:01:08.000 | 243 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-16 00:01:08.000 | 136 |
2022-05-15 00:01:08.000 | 234 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1;
ts | d |
========================================
2022-05-15 00:01:08.000 | 234 |
......@@ -71,3 +71,30 @@ select a.ts, a.c2, b.c2 from meters as a join meters as b on a.ts = b.ts order b
explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts desc\G;
explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts asc\G;
select a.ts, a.c2, b.c2 from meters as a join (select * from meters order by ts desc) b on a.ts = b.ts order by a.ts asc;
select ts, c2 from meters order by c2;
select ts, c2 from meters order by c2 limit 4;
select ts, c2 from meters order by c2 limit 2,2;
select ts, c2 from meters order by ts asc, c2 desc limit 10;
select ts, c2 from meters order by ts asc, c2 desc limit 5,5;
select ts, c2 from d1 order by c2;
select ts, c2 from d1 order by c2 limit 4;
select ts, c2 from d1 order by c2 limit 2,2;
select ts, c2 from d1 order by ts asc, c2 desc limit 10;
select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1;
select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8;
select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1;
......@@ -321,7 +321,7 @@ class TDTestCase:
limit = 5
offset = paraDict["rowsPerTbl"] * 2
offset = offset - 2
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit %d offset %d"%(limit, offset)
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1, max(c1) limit %d offset %d"%(limit, offset)
# tdLog.info("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(1)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册