提交 a7d10002 编写于 作者: H Haojun Liao

[td-225] fix bugs in limit/offset for super table projection query.

上级 2c621390
...@@ -97,7 +97,9 @@ typedef struct { ...@@ -97,7 +97,9 @@ typedef struct {
STSCursor cur; STSCursor cur;
} SQueryStatusInfo; } SQueryStatusInfo;
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
static void setQueryStatus(SQuery *pQuery, int8_t status); static void setQueryStatus(SQuery *pQuery, int8_t status);
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
// todo move to utility // todo move to utility
...@@ -278,6 +280,20 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -278,6 +280,20 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
return maxOutput; return maxOutput;
} }
/*
* the value of number of result needs to be update due to offset value upated.
*/
void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
assert(pResInfo->numOfRes > numOfRes);
pResInfo->numOfRes = numOfRes;
}
}
static int32_t getGroupResultId(int32_t groupIndex) { static int32_t getGroupResultId(int32_t groupIndex) {
int32_t base = 200000; int32_t base = 200000;
return base + (groupIndex * 10000); return base + (groupIndex * 10000);
...@@ -354,9 +370,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) { ...@@ -354,9 +370,7 @@ bool isSelectivityWithTagsQuery(SQuery *pQuery) {
bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; } bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.functionId == TSDB_FUNC_TS_COMP; }
static bool limitResults(SQInfo *pQInfo) { static bool limitResults(SQuery *pQuery) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) { if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total; pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total;
assert(pQuery->rec.rows > 0); assert(pQuery->rec.rows > 0);
...@@ -626,6 +640,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, ...@@ -626,6 +640,7 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
int32_t i = 0; int32_t i = 0;
int64_t skey = TSKEY_INITIAL_VAL; int64_t skey = TSKEY_INITIAL_VAL;
// TODO opt performance: get the closed time window here
for (i = 0; i < pWindowResInfo->size; ++i) { for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i]; SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->status.closed) { if (pResult->status.closed) {
...@@ -2408,6 +2423,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2408,6 +2423,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage)); char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
if (tmp == NULL) { // todo handle the oom if (tmp == NULL) { // todo handle the oom
assert(0);
} else { } else {
pQuery->sdata[i] = (tFilePage *)tmp; pQuery->sdata[i] = (tFilePage *)tmp;
} }
...@@ -2421,7 +2437,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2421,7 +2437,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
} }
qTrace("QInfo: %p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv), qTrace("QInfo:%p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv),
newSize, pRec->capacity, newSize - pRec->rows); newSize, pRec->capacity, newSize - pRec->rows);
pRec->capacity = newSize; pRec->capacity = newSize;
...@@ -2434,11 +2450,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2434,11 +2450,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
// save last access position // while the output buffer is full or limit/offset is applied, query may be paused here
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL | QUERY_COMPLETED)) {
break; break;
} }
} }
...@@ -3173,30 +3189,37 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3173,30 +3189,37 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
} }
if (pQuery->rec.rows <= pQuery->limit.offset) { if (pQuery->rec.rows <= pQuery->limit.offset) {
qTrace("QInfo:%p skip rows:%d, new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pQuery->rec.rows,
pQuery->limit.offset - pQuery->rec.rows);
pQuery->limit.offset -= pQuery->rec.rows; pQuery->limit.offset -= pQuery->rec.rows;
pQuery->rec.rows = 0; pQuery->rec.rows = 0;
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
// clear the buffer is full flag if exists // clear the buffer is full flag if exists
pQuery->status &= (~QUERY_RESBUF_FULL); CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL);
} else { } else {
int32_t numOfSkip = (int32_t) pQuery->limit.offset; int64_t numOfSkip = pQuery->limit.offset;
pQuery->rec.rows -= numOfSkip; pQuery->rec.rows -= numOfSkip;
pQuery->limit.offset = 0;
qTrace("QInfo:%p skip row:%"PRId64", new offset:%d, numOfRows remain:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), numOfSkip,
0, pQuery->rec.rows);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].base.functionId; int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes; int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes); memmove(pQuery->sdata[i]->data, pQuery->sdata[i]->data + bytes * numOfSkip, pQuery->rec.rows * bytes);
pRuntimeEnv->pCtx[i].aOutputBuf += bytes * numOfSkip; pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pQuery->rec.rows;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
pRuntimeEnv->pCtx[i].ptsOutputBuf += TSDB_KEYSIZE * numOfSkip; pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
} }
} }
pQuery->limit.offset = 0; updateNumOfResult(pRuntimeEnv, pQuery->rec.rows);
} }
} }
...@@ -3205,7 +3228,7 @@ void setQueryStatus(SQuery *pQuery, int8_t status) { ...@@ -3205,7 +3228,7 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
pQuery->status = status; pQuery->status = status;
} else { } else {
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
pQuery->status &= (~QUERY_NOT_COMPLETED); CLEAR_QUERY_STATUS(pQuery, QUERY_NOT_COMPLETED);
pQuery->status |= status; pQuery->status |= status;
} }
} }
...@@ -3957,7 +3980,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc ...@@ -3957,7 +3980,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes);
} }
...@@ -4075,7 +4098,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { ...@@ -4075,7 +4098,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock);
pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index pRuntimeEnv->windowResInfo.curIndex = index; // restore the window index
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, numOfRes:%d",
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
return true; return true;
} else { // do nothing } else { // do nothing
...@@ -4532,8 +4555,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4532,8 +4555,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
continue; continue;
} }
// SPointInterpoSupporter pointInterpSupporter = {0};
// TODO handle the limit offset problem // TODO handle the limit offset problem
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
// skipBlocks(pRuntimeEnv); // skipBlocks(pRuntimeEnv);
...@@ -4544,12 +4565,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4544,12 +4565,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
} }
scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey); scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
skipResults(pRuntimeEnv); skipResults(pRuntimeEnv);
// the limitation of output result is reached, set the query completed // the limitation of output result is reached, set the query completed
if (limitResults(pQInfo)) { if (limitResults(pQuery)) {
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
break; break;
} }
...@@ -4578,18 +4597,15 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ...@@ -4578,18 +4597,15 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
break; break;
} }
} else { // forward query range } else {
pQuery->window.skey = pQuery->current->lastKey;
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pQuery->rec.rows == 0) { if (pQuery->rec.rows == 0) {
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
continue; continue;
} else { } else {
// pQInfo->pTableQuerySupporter->pMeterSidExtInfo[k]->key = pQuery->lastKey; // buffer is full, wait for the next round to retrieve data from current meter
// // buffer is full, wait for the next round to retrieve data from current meter assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
// assert(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)); break;
// break;
} }
} }
} }
...@@ -4809,7 +4825,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -4809,7 +4825,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery->rec.rows = getNumOfResult(pRuntimeEnv); pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
skipResults(pRuntimeEnv); skipResults(pRuntimeEnv);
limitResults(pQInfo); limitResults(pQuery);
} }
static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
...@@ -4857,7 +4873,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -4857,7 +4873,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
} }
limitResults(pQInfo); limitResults(pQuery);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
...@@ -4935,7 +4951,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -4935,7 +4951,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
// the offset is handled at prepare stage if no interpolation involved // the offset is handled at prepare stage if no interpolation involved
if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) { if (pQuery->fillType == TSDB_FILL_NONE || pQuery->rec.rows == 0) {
limitResults(pQInfo); limitResults(pQuery);
break; break;
} else { } else {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime, TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->slidingTime,
...@@ -4947,7 +4963,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -4947,7 +4963,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pQInfo); limitResults(pQuery);
break; break;
} }
...@@ -4982,7 +4998,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4982,7 +4998,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows); qTrace("QInfo: %p fill results completed, final:%d", pQInfo, pQuery->rec.rows);
if (pQuery->rec.rows > 0) { if (pQuery->rec.rows > 0) {
limitResults(pQInfo); limitResults(pQuery);
} }
qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
......
...@@ -395,6 +395,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -395,6 +395,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfBlocks == 0) { // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfBlocks == 0) { // no data block in this file, try next file
pCheckInfo->numOfBlocks = 0;
continue;//no data blocks in the file belongs to pCheckInfo->pTable continue;//no data blocks in the file belongs to pCheckInfo->pTable
} else { } else {
if (pCheckInfo->compSize < compIndex->len) { if (pCheckInfo->compSize < compIndex->len) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册