提交 3aa3c820 编写于 作者: H hjxilinx

[TD-28]fix bugs in super table query during interpolate the skey/ekey values

上级 1d14c057
...@@ -279,6 +279,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); ...@@ -279,6 +279,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo); void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1529,14 +1529,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t ...@@ -1529,14 +1529,6 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
w.ekey = w.skey + pQuery->intervalTime - 1; w.ekey = w.skey + pQuery->intervalTime - 1;
} }
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) {
w.ekey = pQuery->ekey;
}
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0); assert(ts >= w.skey && ts <= w.ekey && w.skey != 0);
return w; return w;
...@@ -2055,36 +2047,37 @@ static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, S ...@@ -2055,36 +2047,37 @@ static void doSetInterpolationDataForTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, S
SBlockInfo* pBlockInfo, STimeWindow* win, int32_t startPos, int32_t forwardStep) { SBlockInfo* pBlockInfo, STimeWindow* win, int32_t startPos, int32_t forwardStep) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY* primaryKeyCol = (TSKEY*) pRuntimeEnv->primaryColBuffer->data;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pRuntimeEnv->interpoSearch) { if (!pRuntimeEnv->interpoSearch) {
int32_t s = startPos; return;
int32_t e = forwardStep * step + startPos - step; }
if (!QUERY_IS_ASC_QUERY(pQuery)) { int32_t s = startPos;
SWAP(s, e, int32_t); int32_t e = forwardStep * step + startPos - step;
if (!QUERY_IS_ASC_QUERY(pQuery)) {
SWAP(s, e, int32_t);
}
// interpolate for skey value
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) {
continue;
} }
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo; interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
}
interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf); // interpolate for ekey value
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
if (pQuery->pSelectExpr[i].pBase.functionId != TSDB_FUNC_RATE) {
continue;
} }
// the first time window, do not employ the interpolation SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
if (primaryKeyCol[s] == pWindowResInfo->startTime) { interpolateEndKeyValue(pRuntimeEnv, pBlockInfo, win, e, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
pRuntimeEnv->pCtx[i].prev.key = -1;
}
} else {
for(int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SColIndexEx *pCol = &pQuery->pSelectExpr[i].pBase.colInfo;
interpolateStartKeyValue(pRuntimeEnv, pBlockInfo, pWindowResInfo, win, s, &pRuntimeEnv->pCtx[i], pCol->colIdxInBuf);
}
}
} }
} }
...@@ -2105,6 +2098,13 @@ static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowRe ...@@ -2105,6 +2098,13 @@ static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowRe
break; break;
} }
// do not check for the closed time window
SWindowResult* pWindowRes = getWindowRes(pWindowResInfo, slot);
if (pWindowRes->status.closed) {
slot += 1;
continue;
}
// if current active window locates before current data block, do interpolate the result and close it // if current active window locates before current data block, do interpolate the result and close it
assert((w.skey < win->skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) || assert((w.skey < win->skey && w.ekey < ts && QUERY_IS_ASC_QUERY(pQuery)) ||
(w.skey > win->skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery))); (w.skey > win->skey && w.skey > ts && !QUERY_IS_ASC_QUERY(pQuery)));
...@@ -2112,10 +2112,10 @@ static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowRe ...@@ -2112,10 +2112,10 @@ static void doInterpolatePrevTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SWindowRe
int32_t forwardStep = 0; int32_t forwardStep = 0;
doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep); doSetInterpolationDataForTimeWindow(pRuntimeEnv, pWindowResInfo, pBlockInfo, &w, offset, forwardStep);
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo)); SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, slot);
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &w, pQuery->pos, forwardStep);
closeTimeWindow(pWindowResInfo, curTimeWindow(pWindowResInfo)); closeTimeWindow(pWindowResInfo, slot);
// try next time window // try next time window
slot += 1; slot += 1;
...@@ -2168,14 +2168,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -2168,14 +2168,6 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, setExecParams(pRuntimeEnv, &pCtx[k], pQuery->skey, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField,
hasNull, &sasArray[k]); hasNull, &sasArray[k]);
} }
// save the last row in current data block
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo* pColInfo = &pQuery->colList[i].data;
int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0;
memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (isIntervalQuery(pQuery)) { if (isIntervalQuery(pQuery)) {
...@@ -2248,6 +2240,14 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t ...@@ -2248,6 +2240,14 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
if (!isIntervalQuery(pQuery)) { if (!isIntervalQuery(pQuery)) {
num = getNumOfResult(pRuntimeEnv) - prevNumOfRes; num = getNumOfResult(pRuntimeEnv) - prevNumOfRes;
} }
// save the last row in current data block
for(int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo* pColInfo = &pQuery->colList[i].data;
int32_t s = (QUERY_IS_ASC_QUERY(pQuery))? pColInfo->bytes * (pBlockInfo->size - 1) : 0;
memcpy(pRuntimeEnv->lastRowInBlock[i], pRuntimeEnv->colDataBuffer[i]->data + s, pColInfo->bytes);
}
tfree(sasArray); tfree(sasArray);
return (int32_t)num; return (int32_t)num;
...@@ -2458,6 +2458,11 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { ...@@ -2458,6 +2458,11 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
} }
} }
SWindowResult* getWindowRes(SWindowResInfo* pWindowResInfo, size_t index) {
assert(index < pWindowResInfo->size);
return &pWindowResInfo->pResult[index];
}
/* /*
* remove the results that are not the FIRST time window that spreads beyond the * remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time
...@@ -3276,6 +3281,12 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3276,6 +3281,12 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree(pRuntimeEnv->pInterpoBuf); tfree(pRuntimeEnv->pInterpoBuf);
} }
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
tfree(pRuntimeEnv->lastRowInBlock[i]);
}
tfree(pRuntimeEnv->lastRowInBlock);
destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf); destroyDiskbasedResultBuf(pRuntimeEnv->pResultBuf);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
...@@ -5220,6 +5231,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -5220,6 +5231,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pSupporter->rawEKey = pQuery->ekey; pSupporter->rawEKey = pQuery->ekey;
pSupporter->rawSKey = pQuery->skey; pSupporter->rawSKey = pQuery->skey;
pQuery->lastKey = pQuery->skey; pQuery->lastKey = pQuery->skey;
pRuntimeEnv->interpoSearch = needsBoundaryTS(pQuery);
// create runtime environment // create runtime environment
SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel; SColumnModel *pTagSchemaInfo = pSupporter->pSidSet->pColumnModel;
...@@ -7501,7 +7513,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO ...@@ -7501,7 +7513,6 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO
doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey);
pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp
// assert(pWindowResInfo->startTime > 0);
if (pWindowResInfo->prevSKey == 0) { if (pWindowResInfo->prevSKey == 0) {
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
...@@ -7806,6 +7817,8 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn ...@@ -7806,6 +7817,8 @@ void stableApplyFunctionsOnBlock(STableQuerySupportObj *pSupporter, SMeterDataIn
updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo); updateWindowResNumOfRes(pRuntimeEnv, pMeterDataInfo);
updatelastkey(pQuery, pMeterQueryInfo); updatelastkey(pQuery, pMeterQueryInfo);
doCheckQueryCompleted(pRuntimeEnv, pMeterQueryInfo->lastKey, pWindowResInfo);
} }
// we need to split the refstatsult into different packages. // we need to split the refstatsult into different packages.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册