提交 eb39c4bf 编写于 作者: H hjxilinx

fix bugs in sliding query processing

上级 110b1d83
......@@ -1526,7 +1526,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR
// in the first scan, new space needed for results
int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf, sid);
if (list.size == 0) {
pData = getNewDataBuf(pResultBuf, sid, &pageId);
} else {
......@@ -1550,7 +1550,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR
pWindowRes->pos.pageId = pageId;
pWindowRes->pos.rowId = pData->numOfElems++;
}
return 0;
}
......@@ -1564,8 +1564,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
return -1;
}
// not assign result buffer yet
// todo refactor
// not assign result buffer yet, add new result buffer
if (pWindowRes->pos.pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
......@@ -1604,6 +1603,45 @@ static int32_t getForwardStepsInBlock(int32_t numOfPoints, __block_search_fn_t s
return forwardStep;
}
static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { // query completed
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllSlidingWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else {
int32_t i = 0;
int64_t skey = 0;
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeSlidingWindow(pWindowResInfo, i);
} else {
skey = pResult->window.skey;
break;
}
}
pWindowResInfo->prevSKey = skey;
// the number of completed slots are larger than the threshold, dump to client immediately.
int32_t v = numOfClosedSlidingWindow(pWindowResInfo);
if (v > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v);
}
}
}
/**
*
* @param pRuntimeEnv
......@@ -1660,18 +1698,29 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery);
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
// continue;
return 0;
}
if (win.ekey < pBlockInfo->keyLast) {
forwardStep =
getForwardStepsInBlock(pBlockInfo->size, searchFn, win.ekey, pQuery->pos, pQuery->order.order, primaryKeyCol);
if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor
if (win.ekey < pBlockInfo->keyLast) {
forwardStep =
getForwardStepsInBlock(pBlockInfo->size, searchFn, win.ekey, pQuery->pos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = pBlockInfo->size - pQuery->pos;
}
} else {
if (win.skey > pBlockInfo->keyFirst) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, win.skey, pQuery->pos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = pQuery->pos + 1;
}
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].nStartQueryTimestamp = win.skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = pQuery->pos;
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
......@@ -1683,6 +1732,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
while (1) {
getNextLogicalQueryRange(pRuntimeEnv, &nextWin);
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
pWindowResInfo->curIndex = index;
......@@ -1695,43 +1745,67 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
break;
}
if (pBlockInfo->keyFirst <= nextWin.skey && pBlockInfo->keyLast >= nextWin.skey) {
int32_t startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.skey, TSQL_SO_ASC);
// if (pBlockInfo->keyLast >= nextWin.skey && pBlockInfo->keyFirst <= nextWin.ekey) {
int32_t startPos = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.skey, TSQL_SO_ASC);
} else {
startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, nextWin.ekey, TSQL_SO_DESC);
}
/*
* This time window does not cover any data, try next time window
* when the time window is too small, this case may happen
*/
if ((primaryKeyCol[startPos] > nextWin.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(primaryKeyCol[startPos] < nextWin.skey && !QUERY_IS_ASC_QUERY(pQuery))) {
continue;
}
// null data, failed to allocate more memory buffer
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) !=
TSDB_CODE_SUCCESS) {
pRuntimeEnv->windowResInfo.curIndex = index;
break;
}
// null data, failed to allocate more memory buffer
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) !=
TSDB_CODE_SUCCESS) {
pRuntimeEnv->windowResInfo.curIndex = index;
break;
}
if (QUERY_IS_ASC_QUERY(pQuery)) { //todo refactor
if (nextWin.ekey < pBlockInfo->keyLast) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.ekey, startPos, pQuery->order.order,
primaryKeyCol);
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.ekey, startPos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = pBlockInfo->size - startPos;
}
} else {
if (nextWin.skey > pBlockInfo->keyFirst) {
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, nextWin.skey, startPos, pQuery->order.order, primaryKeyCol);
} else {
forwardStep = startPos + 1;
}
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].nStartQueryTimestamp = win.skey;
pCtx[k].size = forwardStep;
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
continue;
}
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
pCtx[k].nStartQueryTimestamp = nextWin.skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = startPos;
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
continue;
}
} else {
pWindowResInfo->curIndex = index;
break;
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
}
// } else {
// pWindowResInfo->curIndex = index;
// break;
// }
}
pWindowResInfo->curIndex = index;
} else {
/*
* the sqlfunctionCtx parameters should be set done before all functions are invoked,
......@@ -1746,49 +1820,22 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
}
int64_t lastKey = pBlockInfo->keyLast;
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) {
// query completed
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllSlidingWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else {
int32_t i = 0;
int64_t skey = 0;
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeSlidingWindow(pWindowResInfo, i);
} else {
skey = pResult->window.skey;
break;
}
}
pWindowResInfo->prevSKey = skey;
// the number of completed slots are larger than the threshold, dump to client immediately.
int32_t v = numOfClosedSlidingWindow(pWindowResInfo);
if (v > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v);
}
/*
* No need to calculate the number of output results for group-by normal columns, interval query
* because the results of group by normal column is put into intermediate buffer.
*/
int32_t num = 0;
if (pQuery->intervalTime == 0 && pQuery->slidingTime == 0) {
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
}
int64_t numOfIncrementRes = 0 /*getNumOfResult(pRuntimeEnv) - prevNumOfRes*/;
validateTimestampForSupplementResult(pRuntimeEnv, numOfIncrementRes);
validateTimestampForSupplementResult(pRuntimeEnv, num);
tfree(sasArray);
return (int32_t)numOfIncrementRes;
return (int32_t)num;
}
/**
......@@ -2275,44 +2322,11 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
free(sasArray);
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0 && IS_MASTER_SCAN(pRuntimeEnv)) {
// query completed
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllSlidingWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else {
int32_t i = 0;
int64_t skey = 0;
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeSlidingWindow(pWindowResInfo, i);
} else {
skey = pResult->window.skey;
break;
}
}
pWindowResInfo->prevSKey = skey;
// the number of completed slots are larger than the threshold, dump to client immediately.
int32_t v = numOfClosedSlidingWindow(pWindowResInfo);
if (v > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v);
}
}
lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
/*
* No need to calculate the number of output results for groupby normal columns
* No need to calculate the number of output results for group-by normal columns, interval query
* because the results of group by normal column is put into intermediate buffer.
*/
int32_t num = 0;
......@@ -3481,6 +3495,70 @@ bool normalizeUnBoundLastRowQuery(SMeterQuerySupportObj *pSupporter, SPointInter
return getNeighborPoints(pSupporter, pMeterObj, pPointInterpSupporter);
}
static void getActualRange(SMeterQuerySupportObj *pSupporter, STimeWindow *pTimeWindow) {
SQueryRuntimeEnv * pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj * pMeterObj = pRuntimeEnv->pMeterObj;
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
int32_t order = pQuery->order.order;
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->lastKey = pQuery->skey;
if (QUERY_IS_ASC_QUERY(pQuery)) { // do the desc check first for asc query
pQuery->order.order ^= 1;
TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (t > 0) {
pTimeWindow->ekey = t;
} else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) {
pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
pQuery->order.order = order;
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->lastKey = pQuery->skey;
if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) {
pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
} else { // set no data in file
pQuery->fileId = -1;
pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
}
pQuery->skey = pTimeWindow->skey;
pQuery->ekey = pTimeWindow->ekey;
} else {
pQuery->order.order ^= 1;
if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_GREATER_EQUAL, searchFn)) {
pTimeWindow->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
} else { // set no data in file
pQuery->fileId = -1;
pTimeWindow->skey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
}
// reverse check for maxValue in query range
SWAP(pQuery->skey, pQuery->ekey, TSKEY);
pQuery->order.order ^= 1;
// set no data in file
pQuery->lastKey = pQuery->skey;
TSKEY t = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (t > 0) {
pTimeWindow->ekey = t;
} else if (getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn)) {
pTimeWindow->ekey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
pQuery->ekey = pTimeWindow->skey;
pQuery->skey = pTimeWindow->ekey;
}
pQuery->order.order = order;
}
/**
* determine the first query range, according to raw query range [skey, ekey] and group-by interval.
* the time interval for aggregating is not enforced to check its validation, the minimum interval is not less than
......@@ -4523,55 +4601,26 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
} else {
// find the skey and ekey in case of sliding query
// todo refactor
} else { // find the skey and ekey in case of sliding query
if (pQuery->slidingTime > 0 && pQuery->intervalTime > 0) {
int64_t skey = 0;
SWAP(pQuery->skey, pQuery->ekey, int64_t);
pQuery->order.order ^= 1;
pQuery->lastKey = pQuery->skey;
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) {
sem_post(&pQInfo->dataReady);
pQInfo->over = 1;
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
pQuery->skey = skey;
pQuery->order.order ^= 1;
SWAP(pQuery->skey, pQuery->ekey, int64_t);
int64_t ekey = 0;
pQuery->lastKey = pQuery->skey;
if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) {
//
}
pQuery->skey = ekey;
STimeWindow win = {0};
getActualRange(pSupporter, &win);
TSKEY skey1, ekey1;
TSKEY windowSKey = 0, windowEKey = 0;
TSKEY minKey = MIN(pQuery->skey, pQuery->ekey);
TSKEY maxKey = MAX(pQuery->skey, pQuery->ekey);
doGetAlignedIntervalQueryRangeImpl(pQuery, minKey, minKey, maxKey, &skey1, &ekey1, &windowSKey, &windowEKey);
doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey,
&windowEKey);
pRuntimeEnv->windowResInfo.startTime = windowSKey;
pSupporter->rawSKey = pQuery->skey;
pSupporter->rawEKey = pQuery->ekey;
if (QUERY_IS_ASC_QUERY(pQuery)) {
pRuntimeEnv->windowResInfo.prevSKey = windowSKey;
} else {
pRuntimeEnv->windowResInfo.prevSKey =
windowSKey + ((pQuery->skey - windowSKey) / pQuery->slidingTime) * pQuery->slidingTime;
windowSKey + ((win.ekey - windowSKey) / pQuery->slidingTime) * pQuery->slidingTime;
}
pQuery->over = QUERY_NOT_COMPLETED;
} else {
int64_t ekey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &ekey) == false) ||
......@@ -5093,7 +5142,6 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pTimeWindow->skey += (pQuery->slidingTime * factor);
......@@ -5853,13 +5901,11 @@ void enableFunctForMasterScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
pQuery->order.order = (pQuery->order.order ^ 1);
}
// todo dynamically add new slots
void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) {
int32_t numOfCols = pQuery->numOfOutputCols;
pResultRow->resultInfo = calloc((size_t)numOfCols, sizeof(SResultInfo));
pResultRow->pos =
*posInfo; // page->data + (pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage) + page->numOfElems*s1;
pResultRow->pos = *posInfo;
for (int32_t i = 0; i < numOfCols; ++i) {
SResultInfo *pResultInfo = &pResultRow->resultInfo[i];
......@@ -7073,15 +7119,15 @@ void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEnv, int64_t
int32_t setOutputBufferForIntervalQuery(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) {
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResInfo* pWindowResInfo = &pMeterQueryInfo->windowResInfo;
SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo;
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, pMeterQueryInfo->lastKey, pRuntimeEnv->pQuery);
SWindowResult *pWindowRes = doSetSlidingWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win.skey, TSDB_KEYSIZE);
if (pWindowRes == NULL) {
return -1;
}
// not allocated yet, allocate new buffer
if (pWindowRes->pos.pageId == -1) {
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, pMeterQueryInfo->sid, pRuntimeEnv->numOfRowsPerPage);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册