提交 773432c1 编写于 作者: H hjxilinx

[td-169] fix bugs in interval query with limit/offset conditions

上级 3f24ff44
...@@ -765,8 +765,8 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus ...@@ -765,8 +765,8 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
} }
static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin, static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin,
SWindowResInfo *pWindowResInfo, SDataBlockInfo *pDataBlockInfo, SDataBlockInfo *pDataBlockInfo, TSKEY *primaryKeys,
TSKEY *primaryKeys, __block_search_fn_t searchFn) { __block_search_fn_t searchFn) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
while (1) { while (1) {
...@@ -945,8 +945,7 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati ...@@ -945,8 +945,7 @@ static void blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStati
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t startPos = int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, primaryKeyCol, searchFn);
getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pDataBlockInfo, primaryKeyCol, searchFn);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
...@@ -1358,8 +1357,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY ...@@ -1358,8 +1357,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY
pCtx->preAggVals.isSet = false; pCtx->preAggVals.isSet = false;
} }
pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery)? pQuery->pos : 0; pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos : 0;
pCtx->size = QUERY_IS_ASC_QUERY(pQuery)? size - pQuery->pos : pQuery->pos + 1; pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? size - pQuery->pos : pQuery->pos + 1;
uint32_t status = aAggs[functionId].nStatus; uint32_t status = aAggs[functionId].nStatus;
if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) { if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {
...@@ -1370,7 +1369,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY ...@@ -1370,7 +1369,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY
// last_dist or first_dist function // last_dist or first_dist function
// store the first&last timestamp into the intermediate buffer [1], the true // store the first&last timestamp into the intermediate buffer [1], the true
// value may be null but timestamp will never be null // value may be null but timestamp will never be null
// pCtx->ptsList = tsCol; // pCtx->ptsList = tsCol;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA ||
functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
/* /*
...@@ -1386,7 +1385,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY ...@@ -1386,7 +1385,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY
pTWAInfo->EKey = pQuery->window.ekey; pTWAInfo->EKey = pQuery->window.ekey;
} }
// pCtx->ptsList = tsCol; // pCtx->ptsList = tsCol;
} else if (functionId == TSDB_FUNC_ARITHM) { } else if (functionId == TSDB_FUNC_ARITHM) {
pCtx->param[1].pz = param; pCtx->param[1].pz = param;
...@@ -2496,8 +2495,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2496,8 +2495,8 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pWindowResInfo->prevSKey = w.skey; pWindowResInfo->prevSKey = w.skey;
} else { } else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
TSKEY start = blockInfo.window.ekey - pQuery->intervalTime; getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &skey1,
getAlignQueryTimeWindow(pQuery, start, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); &ekey1, &w);
pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey; pWindowResInfo->prevSKey = w.skey;
...@@ -4061,7 +4060,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc ...@@ -4061,7 +4060,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->limit.offset == pBlockInfo->rows) { // current block will ignore completed if (pQuery->limit.offset == pBlockInfo->rows) { // current block will ignore completed
pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step; pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step;
pQuery->limit.offset = 0; pQuery->limit.offset = 0;
return; return;
} }
...@@ -4074,7 +4073,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc ...@@ -4074,7 +4073,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->rows - 1); assert(pQuery->pos >= 0 && pQuery->pos <= pBlockInfo->rows - 1);
SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
// update the pQuery->limit.offset value, and pQuery->pos value // update the pQuery->limit.offset value, and pQuery->pos value
...@@ -4115,7 +4114,8 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4115,7 +4114,8 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey; pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey;
pQuery->lastKey += step; pQuery->lastKey += step;
qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows, pQuery->limit.offset); qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows,
pQuery->limit.offset);
} else { // find the appropriated start position in current block } else { // find the appropriated start position in current block
updateOffsetVal(pRuntimeEnv, &blockInfo); updateOffsetVal(pRuntimeEnv, &blockInfo);
break; break;
...@@ -4123,44 +4123,37 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -4123,44 +4123,37 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
} }
static UNUSED_FUNC bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo) { static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery * pQuery = pRuntimeEnv->pQuery;
// if queried with value filter, do NOT forward query start position // if queried with value filter, do NOT forward query start position
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { if (pQuery->limit.offset <= 0 || pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
return true; return true;
} }
if (pQuery->limit.offset > 0 && (!isTopBottomQuery(pQuery)) && pQuery->interpoType == TSDB_INTERPO_NONE) {
/* /*
* 1. for top/bottom query, the offset applies to the final result, not here * 1. for interval without interpolation query we forward pQuery->intervalTime at a time for
* 2. for interval without interpolation query we forward pQuery->intervalTime at a time for
* pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is * pQuery->limit.offset times. Since hole exists, pQuery->intervalTime*pQuery->limit.offset value is
* not valid. otherwise, we only forward pQuery->limit.offset number of points * not valid. otherwise, we only forward pQuery->limit.offset number of points
*/ */
if (isIntervalQuery(pQuery)) {
assert(pRuntimeEnv->windowResInfo.prevSKey == 0); assert(pRuntimeEnv->windowResInfo.prevSKey == 0);
TSKEY skey1, ekey1; TSKEY skey1, ekey1;
STimeWindow w = {0}; STimeWindow w = {0};
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) { while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
// todo handle no data situation
}
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle);
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery) && pWindowResInfo->prevSKey == 0) {
getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1,
&ekey1, &w); &w);
pWindowResInfo->startTime = w.skey; pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey; pWindowResInfo->prevSKey = w.skey;
} else { } else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
TSKEY start = blockInfo.window.ekey - pQuery->intervalTime; getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1,
getAlignQueryTimeWindow(pQuery, start, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); &w);
pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey; pWindowResInfo->prevSKey = w.skey;
...@@ -4170,88 +4163,64 @@ static UNUSED_FUNC bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo) { ...@@ -4170,88 +4163,64 @@ static UNUSED_FUNC bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo) {
STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery); STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery);
while (pQuery->limit.offset > 0) { while (pQuery->limit.offset > 0) {
if ((win.ekey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(win.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pQuery->limit.offset -= 1;
pWindowResInfo->prevSKey = win.skey;
}
STimeWindow tw = win; STimeWindow tw = win;
getNextTimeWindow(pQuery, &tw); getNextTimeWindow(pQuery, &tw);
// next time window starts from current data block if (pQuery->limit.offset == 0) {
if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) { (tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
// query completed // load the data block
if ((tw.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(tw.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
tw = win;
SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL); SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
int32_t startPos = getNextQualifiedWindow(pRuntimeEnv, &tw, pWindowResInfo, &blockInfo, pColInfoData->pData, tw = win;
binarySearchForKey); int32_t startPos =
getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey);
assert(startPos >= 0); assert(startPos >= 0);
pQuery->limit.offset -= 1;
// set the abort info // set the abort info
pQuery->pos = startPos; pQuery->pos = startPos;
pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
pWindowResInfo->prevSKey = tw.skey; pWindowResInfo->prevSKey = tw.skey;
win = tw;
continue;
} else {
if (!tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle); int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey,
if ((blockInfo.window.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) || &pRuntimeEnv->windowResInfo, pDataBlock);
(blockInfo.window.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
// set the window that start from the next data block
TSKEY key = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.skey : blockInfo.window.ekey;
STimeWindow n = getActiveTimeWindow(pWindowResInfo, key, pQuery);
// next data block are still covered by current time window qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d",
if (n.skey == win.skey && n.ekey == win.ekey) { GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes);
// do nothing return true;
} else { } else {
pQuery->limit.offset -= 1; // do nothing,
return true;
// query completed }
if ((n.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(n.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
} }
// set the abort info // next time window starts from current data block
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; if ((tw.skey <= blockInfo.window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.skey : blockInfo.window.ekey; (tw.ekey >= blockInfo.window.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
pWindowResInfo->prevSKey = n.skey; // load the data block, note that only the primary timestamp column is required
SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pQueryHandle, NULL);
SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
win = n; tw = win;
} int32_t startPos =
} getNextQualifiedWindow(pRuntimeEnv, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey);
} assert(startPos >= 0);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) || pQuery->limit.offset > 0) { // set the abort info
setQueryStatus(pQuery, QUERY_COMPLETED); pQuery->pos = startPos;
return false; pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
pWindowResInfo->prevSKey = tw.skey;
win = tw;
} else { } else {
assert(0); break; // offset is not 0, and next time window locates in the next block.
// if (IS_DISK_DATA_BLOCK(pQuery)) {
// getTimestampInDiskBlock(pRuntimeEnv, 0);
}
} }
} else { // forward the start position for projection query
skipBlocks(&pQInfo->runtimeEnv);
if (pQuery->limit.offset > 0) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return false;
} }
} }
...@@ -5090,6 +5059,13 @@ static void tableIntervalProcess(SQInfo *pQInfo) { ...@@ -5090,6 +5059,13 @@ static void tableIntervalProcess(SQInfo *pQInfo) {
int32_t numOfInterpo = 0; int32_t numOfInterpo = 0;
// skip blocks without load the actual data block from file if no filter condition present
skipTimeInterval(pRuntimeEnv);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) {
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
}
while (1) { while (1) {
tableIntervalProcessImpl(pRuntimeEnv); tableIntervalProcessImpl(pRuntimeEnv);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册