提交 47a983fb 编写于 作者: H hjxilinx

[td-98] fix bugs in reversed single table query

上级 119549c4
...@@ -170,11 +170,11 @@ typedef struct SQInfo { ...@@ -170,11 +170,11 @@ typedef struct SQInfo {
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
sem_t dataReady; sem_t dataReady;
STableGroupInfo groupInfo; // table id list
void* tsdb; void* tsdb;
STableGroupInfo groupInfo; // table id list
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx; int32_t groupIndex;
int32_t offset; /* offset in group result set of subgroup */ int32_t offset; /* offset in group result set of subgroup */
T_REF_DECLARE() T_REF_DECLARE()
......
...@@ -2900,14 +2900,14 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { ...@@ -2900,14 +2900,14 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
while (pQInfo->subgroupIdx < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
ret = mergeIntoGroupResultImpl(pQInfo, group); ret = mergeIntoGroupResultImpl(pQInfo, group);
if (ret < 0) { // not enough disk space to save the data into disk if (ret < 0) { // not enough disk space to save the data into disk
return -1; return -1;
} }
pQInfo->subgroupIdx += 1; pQInfo->groupIndex += 1;
// this group generates at least one result, return results // this group generates at least one result, return results
if (ret > 0) { if (ret > 0) {
...@@ -2915,11 +2915,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { ...@@ -2915,11 +2915,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
} }
assert(pQInfo->numOfGroupResultPages == 0); assert(pQInfo->numOfGroupResultPages == 0);
dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->subgroupIdx - 1); dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
} }
dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
pQInfo, pQInfo->subgroupIdx - 1, numOfGroups, taosGetTimestampMs() - st); pQInfo, pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2934,7 +2934,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2934,7 +2934,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
} }
// set current query completed // set current query completed
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == pQInfo->pSidSet->numOfSubSet) {
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
// return; // return;
// } // }
...@@ -2943,7 +2943,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2943,7 +2943,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t id = getGroupResultId(pQInfo->subgroupIdx - 1); int32_t id = getGroupResultId(pQInfo->groupIndex - 1);
SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id); SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id);
int32_t total = 0; int32_t total = 0;
...@@ -3149,7 +3149,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { ...@@ -3149,7 +3149,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) {
r = capacity; r = capacity;
} }
int32_t id = getGroupResultId(pQInfo->subgroupIdx) + pQInfo->numOfGroupResultPages; int32_t id = getGroupResultId(pQInfo->groupIndex) + pQInfo->numOfGroupResultPages;
tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId); tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId);
// pagewise copy to dest buffer // pagewise copy to dest buffer
...@@ -3198,8 +3198,8 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * ...@@ -3198,8 +3198,8 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
buf->resultInfo[j].complete = false; buf->resultInfo[j].complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
buf->resultInfo[j].complete = true; buf->resultInfo[j].complete = true;
...@@ -3208,32 +3208,28 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * ...@@ -3208,32 +3208,28 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *
} }
} }
void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
// group by normal columns and interval query on normal table // group by normal columns and interval query on normal table
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u;
}
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
} else { // for simple result of table query, } else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j];
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
pCtx->resultInfo->complete = false; pCtx->resultInfo->complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
pCtx->resultInfo->complete = true; pCtx->resultInfo->complete = true;
} }
} }
} }
pQuery->order.order = pQuery->order.order ^ 1u;
} }
void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
...@@ -3259,14 +3255,11 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { ...@@ -3259,14 +3255,11 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) {
pQuery->order.order = (pQuery->order.order) ^ 1u; pQuery->order.order = (pQuery->order.order) ^ 1u;
} }
void enableFuncForForwardScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { void setCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
} }
pQuery->order.order = (pQuery->order.order) ^ 1u;
} }
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) { void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) {
...@@ -3381,8 +3374,8 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3381,8 +3374,8 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) {
} }
typedef struct SQueryStatus { typedef struct SQueryStatus {
int8_t overStatus; int8_t status;
TSKEY lastKey; // TSKEY lastKey;
STSCursor cur; STSCursor cur;
} SQueryStatus; } SQueryStatus;
...@@ -3390,8 +3383,8 @@ typedef struct SQueryStatus { ...@@ -3390,8 +3383,8 @@ typedef struct SQueryStatus {
static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
pStatus->overStatus = pQuery->status; pStatus->status = pQuery->status;
pStatus->lastKey = pQuery->lastKey; // pStatus->lastKey = pQuery->lastKey;
pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
...@@ -3402,21 +3395,21 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus ...@@ -3402,21 +3395,21 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); // SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
pQuery->lastKey = pQuery->window.skey; // pQuery->lastKey = pQuery->window.skey;
} }
static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
pQuery->lastKey = pStatus->lastKey; // pQuery->lastKey = pStatus->lastKey;
pQuery->status = pStatus->overStatus; pQuery->status = pStatus->status;
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur);
} }
static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { static UNUSED_FUNC void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryStatus qStatus = {0}; SQueryStatus qStatus = {0};
...@@ -3428,19 +3421,15 @@ static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3428,19 +3421,15 @@ static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) {
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
// close necessary function execution during supplementary scan // close necessary function execution during supplementary scan
disableFuncInReverseScan(pRuntimeEnv, pQuery->order.order); disableFuncInReverseScan(pRuntimeEnv);
queryStatusSave(pRuntimeEnv, &qStatus); queryStatusSave(pRuntimeEnv, &qStatus);
// STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
// reverse scan from current position // reverse scan from current position
// tsdbpos_t current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle);
// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order);
doScanAllDataBlocks(pRuntimeEnv); doScanAllDataBlocks(pRuntimeEnv);
queryStatusRestore(pRuntimeEnv, &qStatus); queryStatusRestore(pRuntimeEnv, &qStatus);
enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); setCtxOrder(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv);
} }
...@@ -3508,7 +3497,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3508,7 +3497,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t skey = pQuery->lastKey; int64_t skey = pQuery->lastKey;
int32_t status = pQuery->status; int32_t status = pQuery->status;
int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex; int32_t prevSlot = pRuntimeEnv->windowResInfo.curIndex;
SET_MASTER_SCAN_FLAG(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
...@@ -3517,55 +3506,98 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3517,55 +3506,98 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
doScanAllDataBlocks(pRuntimeEnv); doScanAllDataBlocks(pRuntimeEnv);
if (!needScanDataBlocksAgain(pRuntimeEnv)) { if (!needScanDataBlocksAgain(pRuntimeEnv)) {
// restore the status // restore the status
if (pRuntimeEnv->scanFlag == REPEAT_SCAN) { if (pRuntimeEnv->scanFlag == REPEAT_SCAN) {
pQuery->status = status; pQuery->status = status; // restore the status code when abort from repeat scan
} }
break; break;
} }
STsdbQueryCond cond = { STsdbQueryCond cond = {
.twindow = {pQuery->window.skey, pQuery->lastKey}, .twindow = {.skey = skey, .ekey = pQuery->lastKey - step},
.order = pQuery->order.order, .order = pQuery->order.order,
.colList = pQuery->colList, .colList = pQuery->colList,
.numOfCols = pQuery->numOfCols, .numOfCols = pQuery->numOfCols,
}; };
if (pRuntimeEnv->pSecQueryHandle != NULL) { if (pRuntimeEnv->pSecQueryHandle == NULL) {
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
} }
status = pQuery->status; status = pQuery->status; // backup the status
pRuntimeEnv->windowResInfo.curIndex = activeSlot; pRuntimeEnv->windowResInfo.curIndex = prevSlot;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED); setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN; pRuntimeEnv->scanFlag = REPEAT_SCAN;
// check if query is killed or not // check if query is killed or not
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(pQInfo)) {
return; return;
} }
} }
if (!needReverseScan(pQuery)) {
return;
}
// save the query time window
STimeWindow prev = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey};
// no need to set the end key // reverse order time range
TSKEY lkey = pQuery->lastKey; pQuery->window.skey = pQuery->lastKey - step;
TSKEY ekey = pQuery->window.ekey; pQuery->window.ekey = skey;
pQuery->window.skey = skey; pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
pQuery->window.ekey = pQuery->lastKey - step;
// /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); STsdbQueryCond cond = {
.twindow = pQuery->window,
doReverseScan(pRuntimeEnv); .order = pQuery->order.order,
.colList = pQuery->colList,
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan .numOfCols = pQuery->numOfCols,
pQuery->lastKey = lkey; };
pQuery->window.ekey = ekey;
// clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
// STimeWindow win = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo);
// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order);
// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle); dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv));
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
int32_t status1 = pQuery->status;
STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor
if (pRuntimeEnv->pTSBuf) {
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
tsBufNextPos(pRuntimeEnv->pTSBuf);
}
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
setCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pRuntimeEnv);
// reverse scan from current position
doScanAllDataBlocks(pRuntimeEnv);
pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
tsBufSetCursor(pRuntimeEnv->pTSBuf, &cur);
if (pRuntimeEnv->pTSBuf) {
pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order;
}
setCtxOrder(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
// during reverse scan
pQuery->lastKey = prev.skey;
pQuery->status = status1;
pQuery->window.ekey = prev.ekey;
} }
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
...@@ -3859,17 +3891,17 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde ...@@ -3859,17 +3891,17 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
int32_t totalSubset = getNumOfSubset(pQInfo); int32_t totalSubset = getNumOfSubset(pQInfo);
if (orderType == TSDB_ORDER_ASC) { if (orderType == TSDB_ORDER_ASC) {
startIdx = pQInfo->subgroupIdx; startIdx = pQInfo->groupIndex;
step = 1; step = 1;
} else { // desc order copy all data } else { // desc order copy all data
startIdx = totalSubset - pQInfo->subgroupIdx - 1; startIdx = totalSubset - pQInfo->groupIndex - 1;
step = -1; step = -1;
} }
for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) { for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) {
if (result[i].numOfRows == 0) { if (result[i].numOfRows == 0) {
pQInfo->offset = 0; pQInfo->offset = 0;
pQInfo->subgroupIdx += 1; pQInfo->groupIndex += 1;
continue; continue;
} }
...@@ -3887,7 +3919,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde ...@@ -3887,7 +3919,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
pQInfo->offset += numOfRowsToCopy; pQInfo->offset += numOfRowsToCopy;
} else { } else {
pQInfo->offset = 0; pQInfo->offset = 0;
pQInfo->subgroupIdx += 1; pQInfo->groupIndex += 1;
} }
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
...@@ -4474,14 +4506,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4474,14 +4506,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0);
#if 0 #if 0
while (pQInfo->subgroupIdx < numOfGroups) { while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
size_t numOfTable = taosArrayGetSize(group); size_t numOfTable = taosArrayGetSize(group);
if (isFirstLastRowQuery(pQuery)) { if (isFirstLastRowQuery(pQuery)) {
dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pQInfo->subgroupIdx); pQInfo->groupIndex);
TSKEY key = -1; TSKEY key = -1;
int32_t index = -1; int32_t index = -1;
...@@ -4502,7 +4534,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4502,7 +4534,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
// assert(num >= 0); // assert(num >= 0);
} else { } else {
dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet,
pQInfo->subgroupIdx); pQInfo->groupIndex);
for (int32_t k = start; k <= end; ++k) { for (int32_t k = start; k <= end; ++k) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
...@@ -4520,7 +4552,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4520,7 +4552,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
} }
pSupporter->subgroupIdx++; pSupporter->groupIndex++;
// output buffer is full, return to client // output buffer is full, return to client
if (pQuery->size >= pQuery->pointsToRead) { if (pQuery->size >= pQuery->pointsToRead) {
...@@ -4537,7 +4569,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4537,7 +4569,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
* we need to return it to client in the first place. * we need to return it to client in the first place.
*/ */
if (pQInfo->subgroupIdx > 0) { if (pQInfo->groupIndex > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQuery->rec.total += pQuery->rec.rows; pQuery->rec.total += pQuery->rec.rows;
...@@ -4667,7 +4699,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4667,7 +4699,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
} }
pQInfo->subgroupIdx = 0; pQInfo->groupIndex = 0;
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
} }
...@@ -4744,7 +4776,7 @@ static void doRestoreContext(SQInfo* pQInfo) { ...@@ -4744,7 +4776,7 @@ static void doRestoreContext(SQInfo* pQInfo) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1;
} }
enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); setCtxOrder(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv);
} }
...@@ -4777,9 +4809,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4777,9 +4809,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
if (pQInfo->subgroupIdx > 0) { if (pQInfo->groupIndex > 0) {
/* /*
* if the subgroupIdx > 0, the query process must be completed yet, we only need to * if the groupIndex > 0, the query process must be completed yet, we only need to
* copy the data into output buffer * copy the data into output buffer
*/ */
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
...@@ -4841,7 +4873,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4841,7 +4873,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
} }
if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); // assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0);
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery); copyResToQueryResultBuf(pQInfo, pQuery);
...@@ -4979,11 +5011,11 @@ static void tableIntervalProcess(SQInfo *pQInfo) { ...@@ -4979,11 +5011,11 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
tableIntervalProcessImpl(pRuntimeEnv); tableIntervalProcessImpl(pRuntimeEnv);
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
pQInfo->subgroupIdx = 0; // always start from 0 pQInfo->groupIndex = 0; // always start from 0
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
} }
// the offset is handled at prepare stage if no interpolation involved // the offset is handled at prepare stage if no interpolation involved
...@@ -5015,10 +5047,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) { ...@@ -5015,10 +5047,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
// all data scanned, the group by normal column can return // all data scanned, the group by normal column can return
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result
pQInfo->subgroupIdx = 0; pQInfo->groupIndex = 0;
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
} }
pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsInterpo += numOfInterpo;
...@@ -5054,13 +5086,13 @@ static void tableQueryImpl(SQInfo* pQInfo) { ...@@ -5054,13 +5086,13 @@ static void tableQueryImpl(SQInfo* pQInfo) {
// todo limit the output for interval query? // todo limit the output for interval query?
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
pQInfo->subgroupIdx = 0; // always start from 0 pQInfo->groupIndex = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) { if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQuery->rec.rows += pQuery->rec.rows; pQuery->rec.rows += pQuery->rec.rows;
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
if (pQuery->rec.rows > 0) { if (pQuery->rec.rows > 0) {
dTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); dTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册