提交 52cef5f9 编写于 作者: H Haojun Liao

[td-5126]<enhance>: optimize the outer query performance when handling the time window query.

上级 c5ee2fd5
...@@ -80,7 +80,7 @@ typedef struct SResultRow { ...@@ -80,7 +80,7 @@ typedef struct SResultRow {
uint32_t numOfRows; // number of rows of current time window uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
STimeWindow win; STimeWindow win;
char* key; // start key of current result row char *key; // start key of current result row
} SResultRow; } SResultRow;
typedef struct SGroupResInfo { typedef struct SGroupResInfo {
...@@ -105,7 +105,7 @@ typedef struct SResultRowInfo { ...@@ -105,7 +105,7 @@ typedef struct SResultRowInfo {
int16_t type:8; // data type for hash key int16_t type:8; // data type for hash key
int32_t size:24; // number of result set int32_t size:24; // number of result set
int32_t capacity; // max capacity int32_t capacity; // max capacity
int32_t curIndex; // current start active index SResultRow* current; // current start active index
int64_t prevSKey; // previous (not completed) sliding window start key int64_t prevSKey; // previous (not completed) sliding window start key
} SResultRowInfo; } SResultRowInfo;
......
...@@ -426,13 +426,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes ...@@ -426,13 +426,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
} }
if (p1 != NULL) { if (p1 != NULL) {
for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) { pResultRowInfo->current = (*p1);
if (pResultRowInfo->pResult[i] == (*p1)) {
pResultRowInfo->curIndex = i;
existed = true; existed = true;
break;
}
}
} }
} else { } else {
if (p1 != NULL) { // group by column query if (p1 != NULL) { // group by column query
...@@ -457,8 +452,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes ...@@ -457,8 +452,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
pResult = *p1; pResult = *p1;
} }
pResultRowInfo->pResult[pResultRowInfo->size] = pResult; pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
pResultRowInfo->curIndex = pResultRowInfo->size++; pResultRowInfo->current = pResult;
} }
// too many time window in query // too many time window in query
...@@ -466,7 +461,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes ...@@ -466,7 +461,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
} }
return getResultRow(pResultRowInfo, pResultRowInfo->curIndex); return pResultRowInfo->current;
} }
static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) { static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) {
...@@ -497,7 +492,7 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin ...@@ -497,7 +492,7 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin
static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) { static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
STimeWindow w = {0}; STimeWindow w = {0};
if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) { if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
getInitialStartTimeWindow(pQueryAttr, ts, &w); getInitialStartTimeWindow(pQueryAttr, ts, &w);
pResultRowInfo->prevSKey = w.skey; pResultRowInfo->prevSKey = w.skey;
...@@ -511,8 +506,9 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ...@@ -511,8 +506,9 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
w.ekey = w.skey + pQueryAttr->interval.interval - 1; w.ekey = w.skey + pQueryAttr->interval.interval - 1;
} }
} else { } else {
int32_t slot = curTimeWindowIndex(pResultRowInfo); // int32_t slot = curTimeWindowIndex(pResultRowInfo);
SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot); // SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
SResultRow* pWindowRes = pResultRowInfo->current;
w = pWindowRes->win; w = pWindowRes->win;
} }
...@@ -698,7 +694,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, ...@@ -698,7 +694,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
// all result rows are closed, set the last one to be the skey // all result rows are closed, set the last one to be the skey
if (skey == TSKEY_INITIAL_VAL) { if (skey == TSKEY_INITIAL_VAL) {
pResultRowInfo->curIndex = pResultRowInfo->size - 1; if (pResultRowInfo->size == 0) {
// assert(pResultRowInfo->current == NULL);
pResultRowInfo->current = NULL;
} else {
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
}
} else { } else {
for (i = pResultRowInfo->size - 1; i >= 0; --i) { for (i = pResultRowInfo->size - 1; i >= 0; --i) {
...@@ -709,12 +710,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, ...@@ -709,12 +710,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
} }
if (i == pResultRowInfo->size - 1) { if (i == pResultRowInfo->size - 1) {
pResultRowInfo->curIndex = i; pResultRowInfo->current = pResultRowInfo->pResult[i];
} else { } else {
pResultRowInfo->curIndex = i + 1; // current not closed result object pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object
} }
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey; pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
} }
} }
...@@ -722,7 +723,7 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer ...@@ -722,7 +723,7 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) { if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo); closeAllResultRows(pResultRowInfo);
pResultRowInfo->curIndex = pResultRowInfo->size - 1; pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
} else { } else {
int32_t step = ascQuery ? 1 : -1; int32_t step = ascQuery ? 1 : -1;
doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo); doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo);
...@@ -1231,7 +1232,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1231,7 +1232,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
int32_t prevIndex = curTimeWindowIndex(pResultRowInfo); SResultRow* prevRow = pResultRowInfo->current;
// int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
...@@ -1260,9 +1262,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1260,9 +1262,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// prev time window not interpolation yet. // prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pResultRowInfo); // int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) { // if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) {
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. // for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
if (prevRow != NULL && prevRow != pResultRowInfo->current && pQueryAttr->timeWindowInterpo) {
int32_t j = 0;
while(pResultRowInfo->pResult[j] != prevRow) {
j++;
}
for(; pResultRowInfo->pResult[j] != pResultRowInfo->current; ++j) {
SResultRow* pRes = pResultRowInfo->pResult[j]; SResultRow* pRes = pResultRowInfo->pResult[j];
if (pRes->closed) { if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
...@@ -3146,7 +3155,7 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl ...@@ -3146,7 +3155,7 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl
} }
static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQueryInfo *pTableQueryInfo) { static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) {
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
return; return;
} }
...@@ -3158,7 +3167,12 @@ static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQue ...@@ -3158,7 +3167,12 @@ static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQue
pTableQueryInfo->cur.vgroupIndex = -1; pTableQueryInfo->cur.vgroupIndex = -1;
// set the index to be the end slot of result rows array // set the index to be the end slot of result rows array
pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1; SResultRowInfo* pResRowInfo = &pTableQueryInfo->resInfo;
if (pResRowInfo->size > 0) {
pResRowInfo->current = pResRowInfo->pResult[pResRowInfo->size - 1];
} else {
pResRowInfo->current = NULL;
}
} }
static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
...@@ -3172,7 +3186,7 @@ static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { ...@@ -3172,7 +3186,7 @@ static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
size_t t = taosArrayGetSize(group); size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) { for (int32_t j = 0; j < t; ++j) {
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
updateTableQueryInfoForReverseScan(pQueryAttr, pCheckInfo); updateTableQueryInfoForReverseScan(pCheckInfo);
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
// the start check timestamp of tsdbQueryHandle // the start check timestamp of tsdbQueryHandle
...@@ -4571,7 +4585,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -4571,7 +4585,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
} }
if (pResultRowInfo->size > 0) { if (pResultRowInfo->size > 0) {
pResultRowInfo->curIndex = 0; pResultRowInfo->current = pResultRowInfo->pResult[0];
pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey; pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
} }
...@@ -4597,8 +4611,8 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { ...@@ -4597,8 +4611,8 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
pTableScanInfo->order = cond.order; pTableScanInfo->order = cond.order;
if (pResultRowInfo->size > 0) { if (pResultRowInfo->size > 0) {
pResultRowInfo->curIndex = pResultRowInfo->size-1; pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey; pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
} }
p = doTableScanImpl(pOperator, newgroup); p = doTableScanImpl(pOperator, newgroup);
......
...@@ -45,7 +45,7 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t ...@@ -45,7 +45,7 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t
pResultRowInfo->type = type; pResultRowInfo->type = type;
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
pResultRowInfo->curIndex = -1; pResultRowInfo->current = NULL;
pResultRowInfo->capacity = size; pResultRowInfo->capacity = size;
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES); pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
...@@ -91,8 +91,8 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo ...@@ -91,8 +91,8 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex))); taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex)));
} }
pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
pResultRowInfo->current = NULL;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册