提交 4a939018 编写于 作者: H Haojun Liao

[TD-225] refactor codes.

上级 434aefb8
...@@ -703,31 +703,53 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se ...@@ -703,31 +703,53 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep; return forwardStep;
} }
static UNUSED_FUNC void updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, int32_t* numOfClosed, TSKEY lastKey, bool ascQuery) {
int32_t i = 0;
int64_t skey = TSKEY_INITIAL_VAL;
for (i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) {
numOfClosed += 1;
continue;
}
TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeTimeWindow(pWindowResInfo, i);
} else {
skey = pResult->win.skey;
break;
}
}
// all windows are closed, set the last one to be the skey
if (skey == TSKEY_INITIAL_VAL) {
assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
pWindowResInfo->curIndex = i;
}
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
}
/** /**
* NOTE: the query status only set for the first scan of master scan. * NOTE: the query status only set for the first scan of master scan.
* TODO refactor
*/ */
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) { static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN) { if (pRuntimeEnv->scanFlag != MASTER_SCAN || pWindowResInfo->size == 0) {
return pWindowResInfo->size;
}
// for group by normal column query, close time window and return.
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
closeAllTimeWindow(pWindowResInfo);
return pWindowResInfo->size; return pWindowResInfo->size;
} }
// no qualified results exist, abort check // no qualified results exist, abort check
int32_t numOfClosed = 0; int32_t numOfClosed = 0;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
if (pWindowResInfo->size == 0) {
return pWindowResInfo->size;
}
// query completed // query completed
if ((lastKey >= pQuery->current->win.ekey && QUERY_IS_ASC_QUERY(pQuery)) || if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) {
(lastKey <= pQuery->current->win.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllTimeWindow(pWindowResInfo); closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1; pWindowResInfo->curIndex = pWindowResInfo->size - 1;
...@@ -744,8 +766,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe ...@@ -744,8 +766,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
} }
TSKEY ekey = pResult->win.ekey; TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
(pResult->win.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeTimeWindow(pWindowResInfo, i); closeTimeWindow(pWindowResInfo, i);
} else { } else {
skey = pResult->win.skey; skey = pResult->win.skey;
...@@ -1050,24 +1071,6 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in ...@@ -1050,24 +1071,6 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in
} }
} }
//static double getTSWindowInterpoVal(SColumnInfoData* pColInfo, int16_t srcColIndex, int16_t rowIndex, TSKEY key, char** prevRow, TSKEY* tsCols, int32_t step) {
// TSKEY start = tsCols[rowIndex];
// TSKEY prevTs = (rowIndex == 0)? *(TSKEY *) prevRow[0] : tsCols[rowIndex - step];
//
// double v1 = 0, v2 = 0, v = 0;
// char *prevVal = (rowIndex == 0)? prevRow[srcColIndex] : ((char*)pColInfo->pData) + (rowIndex - step) * pColInfo->info.bytes;
//
// GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal);
// GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + rowIndex * pColInfo->info.bytes);
//
// SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
// SPoint point2 = (SPoint){.key = start, .val = &v2};
// SPoint point = (SPoint){.key = key, .val = &v};
// taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
//
// return v;
//}
// window start key interpolation // window start key interpolation
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
...@@ -1238,6 +1241,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1238,6 +1241,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
} }
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
...@@ -1249,6 +1254,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1249,6 +1254,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
} }
} }
...@@ -1289,6 +1296,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1289,6 +1296,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
} }
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP); done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
...@@ -1299,6 +1308,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * ...@@ -1299,6 +1308,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
} }
} }
...@@ -1802,9 +1813,12 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl ...@@ -1802,9 +1813,12 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied // interval query with limit applied
int32_t numOfRes = 0; int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else { } else if (pRuntimeEnv->groupbyNormalCol) {
closeAllTimeWindow(pWindowResInfo);
numOfRes = pWindowResInfo->size;
} else { // projection query
numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); numOfRes = (int32_t)getNumOfResult(pRuntimeEnv);
// update the number of output result // update the number of output result
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册