未验证 提交 d9f63009 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #15375 from taosdata/feature/3_liaohj

fix(query):fix limit/offset bug.
...@@ -22,6 +22,7 @@ mac/ ...@@ -22,6 +22,7 @@ mac/
.mypy_cache .mypy_cache
*.tmp *.tmp
*.swp *.swp
*.swo
*.orig *.orig
src/connector/nodejs/node_modules/ src/connector/nodejs/node_modules/
src/connector/nodejs/out/ src/connector/nodejs/out/
......
...@@ -154,7 +154,6 @@ typedef struct SQueryTableDataCond { ...@@ -154,7 +154,6 @@ typedef struct SQueryTableDataCond {
int32_t numOfCols; int32_t numOfCols;
SColumnInfo* colList; SColumnInfo* colList;
int32_t type; // data block load type: int32_t type; // data block load type:
// int32_t numOfTWindows;
STimeWindow twindows; STimeWindow twindows;
int64_t startVersion; int64_t startVersion;
int64_t endVersion; int64_t endVersion;
......
...@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { ...@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0; pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
pBasic->queryDesc = NULL; pBasic->queryDesc = NULL;
mDebug("queries updated in conn %d, num:%d", pConn->id, pConn->numOfQueries); mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries);
taosWUnLockLatch(&pConn->queryLock); taosWUnLockLatch(&pConn->queryLock);
......
...@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); ...@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb; // typedef struct STsdb STsdb;
typedef struct STsdbReader STsdbReader; typedef struct STsdbReader STsdbReader;
#define BLOCK_LOAD_OFFSET_ORDER 1 #define TIMEWINDOW_RANGE_CONTAINED 1
#define BLOCK_LOAD_TABLESEQ_ORDER 2 #define TIMEWINDOW_RANGE_EXTERNAL 2
#define BLOCK_LOAD_EXTERN_ORDER 3
#define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
......
...@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); ...@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo); void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void initResultRow(SResultRow* pResultRow); void initResultRow(SResultRow* pResultRow);
void closeResultRow(SResultRow* pResultRow); void closeResultRow(SResultRow* pResultRow);
bool isResultRowClosed(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow);
......
...@@ -108,7 +108,6 @@ typedef struct STaskCostInfo { ...@@ -108,7 +108,6 @@ typedef struct STaskCostInfo {
SFileBlockLoadRecorder* pRecoder; SFileBlockLoadRecorder* pRecoder;
uint64_t elapsedTime; uint64_t elapsedTime;
uint64_t firstStageMergeTime;
uint64_t winInfoSize; uint64_t winInfoSize;
uint64_t tableInfoSize; uint64_t tableInfoSize;
uint64_t hashSize; uint64_t hashSize;
...@@ -549,6 +548,7 @@ typedef struct SProjectOperatorInfo { ...@@ -549,6 +548,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo limitInfo; SLimitInfo limitInfo;
bool mergeDataBlocks; bool mergeDataBlocks;
SSDataBlock* pFinalRes; SSDataBlock* pFinalRes;
SNode* pCondition;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SIndefOperatorInfo { typedef struct SIndefOperatorInfo {
......
...@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) { ...@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
} }
} }
void closeAllResultRows(SResultRowInfo* pResultRowInfo) {
// do nothing
}
bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); } bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); }
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
...@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { ...@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
SArray* createSortInfo(SNodeList* pNodeList) { SArray* createSortInfo(SNodeList* pNodeList) {
size_t numOfCols = 0; size_t numOfCols = 0;
if (pNodeList != NULL) { if (pNodeList != NULL) {
numOfCols = LIST_LENGTH(pNodeList); numOfCols = LIST_LENGTH(pNodeList);
} else { } else {
numOfCols = 0; numOfCols = 0;
} }
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
if (pList == NULL) { if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { ...@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i); SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
/*if (!pDescNode->output) { // todo disable it temporarily*/
/*continue;*/
/*}*/
SColumnInfoData idata = SColumnInfoData idata =
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId); createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
idata.info.scale = pDescNode->dataType.scale; idata.info.scale = pDescNode->dataType.scale;
...@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
} }
} }
#ifdef BUF_PAGE_DEBUG
qDebug("page_setSelect num:%d", num);
#endif
if (p != NULL) { if (p != NULL) {
p->subsidiaries.pCtx = pValCtx; p->subsidiaries.pCtx = pValCtx;
p->subsidiaries.num = num; p->subsidiaries.num = num;
...@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi ...@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// TODO: get it from stable scan node // TODO: get it from stable scan node
pCond->twindows = pTableScanNode->scanRange; pCond->twindows = pTableScanNode->scanRange;
pCond->suid = pTableScanNode->scan.suid; pCond->suid = pTableScanNode->scan.suid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
// pCond->type = pTableScanNode->scanFlag; // pCond->type = pTableScanNode->scanFlag;
...@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter ...@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
} }
// get the correct time window according to the handled timestamp // get the correct time window according to the handled timestamp
// todo refactor
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order) { int32_t order) {
STimeWindow w = {0}; STimeWindow w = {0};
......
...@@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { ...@@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map); // hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
// pSummary->hashSize = hashSize; // pSummary->hashSize = hashSize;
// add the merge time
pSummary->elapsedTime += pSummary->firstStageMergeTime;
// SResultRowPool* p = pTaskInfo->pool; // SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) { // if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p); // pSummary->winInfoSize = getResultRowPoolMemSize(p);
...@@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { ...@@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// pSummary->winInfoSize = 0; // pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0; // pSummary->numOfTimeWindows = 0;
// } // }
//
// calculateOperatorProfResults(pQInfo);
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
if (pSummary->pRecoder != NULL) { if (pSummary->pRecoder != NULL) {
qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64 qDebug(
" us, total blocks:%d, " "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%"
"load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64, PRId64 ", check rows:%" PRId64, GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0,
GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows); pRecorder->totalCheckedRows);
} }
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); // pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
...@@ -3031,7 +3027,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -3031,7 +3027,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
} }
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
...@@ -3405,28 +3400,32 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3405,28 +3400,32 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
break; break;
} }
// no results generated if (pProjectInfo->mergeDataBlocks && pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {
if (pInfo->pRes->info.rows == 0 || (!pProjectInfo->mergeDataBlocks)) { if (pRes->info.rows > 0) {
break; pFinalRes->info.groupId = pRes->info.groupId;
} pFinalRes->info.version = pRes->info.version;
if (pProjectInfo->mergeDataBlocks) {
pFinalRes->info.groupId = pInfo->pRes->info.groupId;
pFinalRes->info.version = pInfo->pRes->info.version;
// continue merge data, ignore the group id // continue merge data, ignore the group id
blockDataMerge(pFinalRes, pInfo->pRes); blockDataMerge(pFinalRes, pRes);
if (pFinalRes->info.rows + pRes->info.rows <= pOperator->resultInfo.threshold) {
if (pFinalRes->info.rows + pInfo->pRes->info.rows <= pOperator->resultInfo.threshold &&
pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {
continue; continue;
} }
} }
// do apply filter // do apply filter
SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes; doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL);
doFilter(pProjectInfo->pFilterNode, p, NULL); if (pFinalRes->info.rows > 0 || pRes->info.rows == 0) {
if (p->info.rows > 0) { break;
}
} else {
// do apply filter
if (pRes->info.rows > 0) {
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
if (pRes->info.rows == 0) {
continue;
}
}
break; break;
} }
} }
...@@ -3891,7 +3890,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -3891,7 +3890,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->pFilterNode = pProjPhyNode->node.pConditions; pInfo->pFilterNode = pProjPhyNode->node.pConditions;
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
...@@ -4422,7 +4420,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC ...@@ -4422,7 +4420,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid; pCond->suid = uid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
......
...@@ -1094,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1094,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL);
} }
closeAllResultRows(&pInfo->binfo.resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
...@@ -1250,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1250,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2045,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -2045,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
...@@ -2209,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2209,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock* pResBlock = pSliceInfo->pRes; SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
// if (pOperator->status == OP_RES_TO_RETURN) { // if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
......
Subproject commit 9cfa195713d1cae9edf417a8d49bde87dd971016 Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册