提交 86015bfc 编写于 作者: H hjxilinx

fix bugs founded in regression test.

上级 1fb55ade
......@@ -164,8 +164,7 @@ void pointInterpSupporterDestroy(SPointInterpoSupporter* pPointInterpSupport);
void pointInterpSupporterSetData(SQInfo* pQInfo, SPointInterpoSupporter* pPointInterpSupport);
int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv* pRuntimeEnv, SPositionInfo* position);
int32_t doCloseAllOpenedResults(STableQuerySupportObj* pSupporter);
void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
void disableFunctForSuppleScan(STableQuerySupportObj* pSupporter, int32_t order);
void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order);
int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter);
......@@ -238,13 +237,6 @@ void changeMeterQueryInfoForSuppleQuery(SQuery* pQuery, SMeterQueryInfo* pMeterQ
tFilePage* addDataPageForMeterQueryInfo(SQuery* pQuery, SMeterQueryInfo* pMeterQueryInfo,
STableQuerySupportObj* pSupporter);
/**
* save the query range data into SMeterQueryInfo
* @param pRuntimeEnv
* @param pMeterQueryInfo
*/
void saveIntervalQueryRange(SQueryRuntimeEnv* pRuntimeEnv, SMeterQueryInfo* pMeterQueryInfo);
/**
* restore the query range data from SMeterQueryInfo to runtime environment
*
......
......@@ -70,19 +70,17 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
static void resetMergeResultBuf(SQuery *pQuery, SQLFunctionCtx *pCtx, SResultInfo *pResultInfo);
static int32_t flushFromResultBuf(STableQuerySupportObj *pSupporter, const SQuery *pQuery,
const SQueryRuntimeEnv *pRuntimeEnv);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
static int32_t getGroupResultId(int32_t groupIndex) {
int32_t base = 200000;
return base + (groupIndex * 10000);
}
static FORCE_INLINE bool isIntervalQuery(SQuery* pQuery) {
return pQuery->intervalTime > 0;
}
static FORCE_INLINE bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
// check the offset value integrity
static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data,
......@@ -1625,51 +1623,51 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
}
// query completed
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window
int32_t i = 0;
int64_t skey = 0;
if ((lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllTimeWindow(pWindowResInfo);
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->status.closed) {
continue;
}
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window
int32_t i = 0;
int64_t skey = 0;
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeTimeWindow(pWindowResInfo, i);
} else {
skey = pResult->window.skey;
break;
}
for (i = 0; i < pWindowResInfo->size; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->status.closed) {
continue;
}
// all windows are closed, set the last one to be the skey
if (skey == 0) {
assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeTimeWindow(pWindowResInfo, i);
} else {
pWindowResInfo->curIndex = i;
skey = pResult->window.skey;
break;
}
}
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey;
// all windows are closed, set the last one to be the skey
if (skey == 0) {
assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
pWindowResInfo->curIndex = i;
}
// the number of completed slots are larger than the threshold, dump to client immediately.
int32_t n = numOfClosedTimeWindow(pWindowResInfo);
if (n > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].window.skey;
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n);
// the number of completed slots are larger than the threshold, dump to client immediately.
int32_t n = numOfClosedTimeWindow(pWindowResInfo);
if (n > pWindowResInfo->threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
assert(pWindowResInfo->prevSKey != 0);
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n);
}
assert(pWindowResInfo->prevSKey != 0);
}
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos,
......@@ -1797,7 +1795,7 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
}
}
static TSKEY reviseWindowEkey(SQuery* pQuery, STimeWindow* pWindow) {
static TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow) {
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQuery)) {
ekey = pWindow->ekey;
......@@ -1810,7 +1808,7 @@ static TSKEY reviseWindowEkey(SQuery* pQuery, STimeWindow* pWindow) {
ekey = pQuery->ekey;
}
}
return ekey;
}
......@@ -2142,11 +2140,11 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
if (isNull(pData, type)) { // ignore the null value
return -1;
}
int32_t GROUPRESULTID = 1;
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
if (pWindowRes == NULL) {
return -1;
......@@ -2159,7 +2157,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
return -1;
}
}
setWindowResOutputBuf(pRuntimeEnv, pWindowRes);
initCtxOutputBuf(pRuntimeEnv);
return TSDB_CODE_SUCCESS;
......@@ -2384,10 +2382,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
continue;
}
}
// update the lastKey
lastKey = primaryKeyCol[offset];
// all startOffset are identical
offset -= pCtx[0].startOffset;
......@@ -2400,8 +2398,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
}
if (pRuntimeEnv->pTSBuf != NULL) {
// if timestamp filter list is empty, quit current query
// if timestamp filter list is empty, quit current query
if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
break;
......@@ -2506,7 +2503,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI
}
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); //todo refactor merge
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); // todo refactor merge
// interval query with limit applied
if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 &&
......@@ -2663,9 +2660,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes
// store the first&last timestamp into the intermediate buffer [1], the true
// value may be null but timestamp will never be null
pCtx->ptsList = (int64_t *)(primaryColumnData + startOffset * TSDB_KEYSIZE);
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA ||
functionId == TSDB_FUNC_DIFF || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
/*
* leastsquares function needs two columns of input, currently, the x value of linear equation is set to
* timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer
......@@ -2961,8 +2957,8 @@ bool isSumAvgRateQuery(SQuery *pQuery) {
continue;
}
if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE ||
functionId == TSDB_FUNC_AVG_RATE || functionId == TSDB_FUNC_AVG_IRATE) {
if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE || functionId == TSDB_FUNC_AVG_RATE ||
functionId == TSDB_FUNC_AVG_IRATE) {
return true;
}
}
......@@ -4637,13 +4633,13 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
} else { // find the skey and ekey in case of sliding query
if (isIntervalQuery(pQuery)) {
STimeWindow win = {0};
// find the minimum value for descending order query
TSKEY minKey = -1;
if (!QUERY_IS_ASC_QUERY(pQuery)) {
minKey = getGreaterEqualTimestamp(pRuntimeEnv);
}
int64_t skey = 0;
if ((normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, &interpInfo, &skey) == false) ||
(isFixedOutputQuery(pQuery) && !isTopBottomQuery(pQuery) && (pQuery->limit.offset > 0)) ||
......@@ -4654,7 +4650,7 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, STableQuery
pointInterpSupporterDestroy(&interpInfo);
return TSDB_CODE_SUCCESS;
}
if (!QUERY_IS_ASC_QUERY(pQuery)) {
win.skey = minKey;
win.ekey = skey;
......@@ -5853,51 +5849,62 @@ void setMeterDataInfo(SMeterDataInfo *pMeterDataInfo, SMeterObj *pMeterObj, int3
pMeterDataInfo->meterOrderIdx = meterIdx;
}
void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && isIntervalQuery(pQuery))) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1;
static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo *pWindowResInfo, int32_t order) {
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i);
if (!pStatus->closed) {
continue;
}
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
SWindowResult *buf = getWindowResult(pWindowResInfo, i);
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, i);
if (!pStatus->closed) {
continue;
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSQL_SO_DESC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSQL_SO_ASC)) {
buf->resultInfo[j].complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
buf->resultInfo[j].complete = true;
}
}
}
}
SWindowResult *buf = getWindowResult(pWindowResInfo, i);
void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
SQuery *pQuery = pRuntimeEnv->pQuery;
assert(!pRuntimeEnv->stableQuery);
// open/close the specified query for each group result
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
int32_t functId = pQuery->pSelectExpr[j].pBase.functionId;
// group by normal columns and interval query on normal table
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1;
}
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSQL_SO_DESC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSQL_SO_ASC)) {
buf->resultInfo[j].complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
buf->resultInfo[j].complete = true;
}
}
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
pQuery->order.order = pQuery->order.order ^ 1;
}
void disableFunctForSuppleScan(STableQuerySupportObj *pSupporter, int32_t order) {
SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
if (isIntervalQuery(pQuery)) {
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
SMeterQueryInfo *pMeterQueryInfo = pSupporter->pMeterDataInfo[i].pMeterQInfo;
SWindowResInfo * pWindowResInfo = &pMeterQueryInfo->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
}
} else { // TODO ERROR!!
// need to handle for each query result, not just the single runtime ctx.
} else {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1;
int32_t functId = pQuery->pSelectExpr[i].pBase.functionId;
SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]);
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSQL_SO_DESC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSQL_SO_ASC)) {
pResInfo->complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
pResInfo->complete = true;
}
}
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order);
}
pQuery->order.order = pQuery->order.order ^ 1;
......@@ -6158,7 +6165,7 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) {
(!QUERY_IS_ASC_QUERY(pQuery) && endKey >= pQuery->ekey));
// close necessary function execution during supplementary scan
disableFunctForSuppleScan(pRuntimeEnv, pQuery->order.order);
disableFunctForTableSuppleScan(pRuntimeEnv, pQuery->order.order);
queryStatusSave(pRuntimeEnv, &qStatus);
doScanAllDataBlocks(pRuntimeEnv);
......@@ -6239,8 +6246,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
* round scan all data blocks.
*/
TSKEY key = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->startPos);
assert((QUERY_IS_ASC_QUERY(pQuery) && key >= pQuery->skey) ||
(!QUERY_IS_ASC_QUERY(pQuery) && key <= pQuery->skey));
assert((QUERY_IS_ASC_QUERY(pQuery) && key >= pQuery->skey) || (!QUERY_IS_ASC_QUERY(pQuery) && key <= pQuery->skey));
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pQuery->lastKey = pQuery->skey;
......@@ -6525,34 +6531,8 @@ void changeMeterQueryInfoForSuppleQuery(SQuery *pQuery, SMeterQueryInfo *pMeterQ
SWAP(pMeterQueryInfo->skey, pMeterQueryInfo->ekey, TSKEY);
pMeterQueryInfo->lastKey = pMeterQueryInfo->skey;
// pMeterQueryInfo->queryRangeSet = 0;
pMeterQueryInfo->cur.order = pMeterQueryInfo->cur.order ^ 1;
pMeterQueryInfo->cur.vnodeIndex = -1;
// previous does not generate any results
// SIDList list = getDataBufPagesIdList(pResultBuf, pMeterQueryInfo->sid);
//
// if (list.size == 0) {
// pMeterQueryInfo->reverseFillRes = 0;
// } else {
// pMeterQueryInfo->reverseIndex = pMeterQueryInfo->numOfRes;
// pMeterQueryInfo->reverseFillRes = 1;
// }
}
void saveIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// pMeterQueryInfo->skey = pQuery->skey;
// pMeterQueryInfo->ekey = pQuery->ekey;
// pMeterQueryInfo->lastKey = pQuery->lastKey;
assert(((pQuery->lastKey >= pQuery->skey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pQuery->lastKey <= pQuery->skey) && !QUERY_IS_ASC_QUERY(pQuery)));
if (pRuntimeEnv->pTSBuf != NULL) {
pMeterQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf);
}
}
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, SMeterQueryInfo *pMeterQueryInfo) {
......@@ -7052,8 +7032,8 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO
SWindowResInfo *pWindowResInfo = &pMeterQueryInfo->windowResInfo;
doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey);
pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp
// assert(pWindowResInfo->startTime > 0);
pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp
// assert(pWindowResInfo->startTime > 0);
if (pWindowResInfo->prevSKey == 0) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
......
......@@ -869,7 +869,7 @@ static void doMultiMeterSupplementaryScan(SQInfo *pQInfo) {
}
SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv);
disableFunctForSuppleScan(pRuntimeEnv, pQuery->order.order);
disableFunctForSuppleScan(pSupporter, pQuery->order.order);
if (pRuntimeEnv->pTSBuf != NULL) {
pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册