提交 be4d72bf 编写于 作者: H Haojun Liao

refactor(query): refactor executor module to optimization the block load filter routine.

上级 5f9bce66
......@@ -72,7 +72,8 @@ typedef struct SResultRowInfo {
SResultRowPosition *pPosition;
int32_t size; // number of result set
int32_t capacity; // max capacity
int32_t curPos; // current active result row index of pResult list
// int32_t curPos; // current active result row index of pResult list
SResultRowPosition cur;
} SResultRowInfo;
struct STaskAttr;
......
......@@ -81,11 +81,12 @@ typedef struct SResultInfo { // TODO refactor
} SResultInfo;
typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later
uint64_t uid; // table uid
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
// uint64_t uid; // table uid
// int32_t groupIndex; // group id in table list
// SVariant tag;
SResultRowInfo resInfo; // result info
// SResultRowInfo resInfo; // result info
} STableQueryInfo;
typedef enum {
......
......@@ -52,11 +52,11 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
}
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
pResultRowInfo->size = 0;
pResultRowInfo->curPos = -1;
pResultRowInfo->capacity = size;
pResultRowInfo->size = 0;
pResultRowInfo->capacity = size;
pResultRowInfo->cur.pageId = -1;
pResultRowInfo->pPosition = taosMemoryCalloc(pResultRowInfo->capacity, sizeof(SResultRowPosition));
if (pResultRowInfo->pPosition == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -378,7 +378,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = true;
#if 0
int32_t code = TSDB_CODE_SUCCESS;
int32_t *posList = NULL;
......@@ -402,16 +402,16 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pTableList, i);
if (item->resInfo.size > 0) {
pTableQueryInfoList[numOfTables++] = item;
}
// if (item->resInfo.size > 0) {
// pTableQueryInfoList[numOfTables++] = item;
// }
}
// there is no data in current group
// no need to merge results since only one table in each group
if (numOfTables == 0) {
goto _end;
}
// if (numOfTables == 0) {
// goto _end;
// }
int32_t order = TSDB_ORDER_ASC;
SCompSupporter cs = {pTableQueryInfoList, posList, order};
......@@ -498,6 +498,7 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
// int64_t elapsedTime = taosGetTimestampUs() - st;
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
#endif
return TSDB_CODE_SUCCESS;
}
......
......@@ -459,7 +459,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
if (p1 != NULL) {
if (pResultRowInfo->size == 0) {
existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table.
assert(pResultRowInfo->curPos == -1);
// assert(pResultRowInfo->curPos == -1);
} else if (pResultRowInfo->size == 1) {
SResultRowPosition* p = &pResultRowInfo->pPosition[0];
existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset);
......@@ -485,10 +485,10 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
SResultRow* pResult = NULL;
if (!existInCurrentResusltRowInfo) {
// 1. close current opened time window
if (pResultRowInfo->curPos != -1) { // todo extract function
SResultRowPosition* pos = &pResultRowInfo->pPosition[pResultRowInfo->curPos];
SFilePage* pPage = getBufPage(pResultBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
if (pResultRowInfo->cur.pageId != -1) { // todo extract function
SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset);
closeResultRow(pRow);
releaseBufPage(pResultBuf, pPage);
}
......@@ -508,12 +508,13 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
}
// 2. set the new time window to be the new active time window
pResultRowInfo->curPos = pResultRowInfo->size;
// pResultRowInfo->curPos = pResultRowInfo->size;
pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
int64_t index = pResultRowInfo->curPos;
// int64_t index = pResultRowInfo->curPos;
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &index, POINTER_BYTES);
taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &pResultRowInfo->cur, POINTER_BYTES);
} else {
pResult = getResultRowByPos(pResultBuf, p1);
}
......@@ -551,11 +552,11 @@ static STimeWindow getActiveTimeWindow(SDiskbasedBuf * pBuf, SResultRowInfo* pRe
int32_t precision, STimeWindow* win) {
STimeWindow w = {0};
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pInterval, precision, ts, &w, win->ekey, true);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else {
w = getResultRow(pBuf, pResultRowInfo, pResultRowInfo->curPos)->win;
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
}
if (w.skey > ts || w.ekey < ts) {
......@@ -789,17 +790,17 @@ static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey
}
#endif
}
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey,
bool ascQuery, bool interp) {
if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo);
pResultRowInfo->curPos = pResultRowInfo->size - 1;
} else {
int32_t step = ascQuery ? 1 : -1;
doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
}
}
//
//static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey,
// bool ascQuery, bool interp) {
// if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
// closeAllResultRows(pResultRowInfo);
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
// } else {
// int32_t step = ascQuery ? 1 : -1;
// doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
// }
//}
static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos,
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
......@@ -1396,7 +1397,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
int32_t step = 1;
bool ascScan = true;
int32_t prevIndex = pResultRowInfo->curPos;
// int32_t prevIndex = pResultRowInfo->curPos;
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
......@@ -1431,7 +1432,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
// prev time window not interpolation yet.
int32_t curIndex = pResultRowInfo->curPos;
// int32_t curIndex = pResultRowInfo->curPos;
#if 0
if (prevIndex != -1 && prevIndex < curIndex && pInfo->timeWindowInterpo) {
......@@ -2725,12 +2726,12 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo)
// pTableQueryInfo->cur.vgroupIndex = -1;
// set the index to be the end slot of result rows array
SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
if (pResultRowInfo->size > 0) {
pResultRowInfo->curPos = pResultRowInfo->size - 1;
} else {
pResultRowInfo->curPos = -1;
}
// SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
// if (pResultRowInfo->size > 0) {
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
// } else {
// pResultRowInfo->curPos = -1;
// }
}
void initResultRow(SResultRow* pResultRow) {
......@@ -2962,10 +2963,10 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow
// set more initial size of interval/groupby query
// if (/*QUERY_IS_INTERVAL_QUERY(pQueryAttr) || */groupbyColumn) {
int32_t initialSize = 128;
int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
// int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize);
// if (code != TSDB_CODE_SUCCESS) {
// return NULL;
// }
// } else { // in other aggregate query, do not initialize the windowResInfo
// }
......@@ -2978,7 +2979,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
}
// taosVariantDestroy(&pTableQueryInfo->tag);
cleanupResultRowInfo(&pTableQueryInfo->resInfo);
// cleanupResultRowInfo(&pTableQueryInfo->resInfo);
}
void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
......@@ -3174,13 +3175,13 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S
* is a previous result generated or not.
*/
void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) {
SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
if (pResultRowInfo->curPos != -1) {
return;
}
// SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
// if (pResultRowInfo->curPos != -1) {
// return;
// }
// pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQRange->ekey};
// STimeWindow win = {.skey = key, .ekey = pQRange->ekey};
/**
* In handling the both ascending and descending order super table query, we need to find the first qualified
......@@ -3188,10 +3189,10 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin
* In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional
* operations involve.
*/
STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = TMIN(win.skey, win.ekey);
TSKEY ek = TMAX(win.skey, win.ekey);
// STimeWindow w = TSWINDOW_INITIALIZER;
//
// TSKEY sk = TMIN(win.skey, win.ekey);
// TSKEY ek = TMAX(win.skey, win.ekey);
// getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w);
// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
......@@ -3792,19 +3793,6 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo) {
// }
// }
static void doCloseAllTimeWindow(STaskRuntimeEnv* pRuntimeEnv) {
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = GET_TABLEGROUP(pRuntimeEnv, i);
size_t num = taosArrayGetSize(group);
for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(group, j);
closeAllResultRows(&item->resInfo);
}
}
}
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param;
if (code == TSDB_CODE_SUCCESS) {
......@@ -4938,7 +4926,7 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi
int32_t offset = sizeof(int32_t);
// prepare memory
SResultRowPosition* pos = &pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.curPos];
SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
void* pPage = getBufPage(pSup->pResultBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true);
......@@ -5025,8 +5013,9 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi
initResultRow(resultRow);
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
pInfo->resultRowInfo.curPos = pInfo->resultRowInfo.size;
// pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size;
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
pInfo->resultRowInfo.cur = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
}
if (offset != length) {
......@@ -5447,7 +5436,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pBlock->info.groupId);
// hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pBlock->info.groupId);
}
closeAllResultRows(&pInfo->binfo.resultRowInfo);
......@@ -5508,7 +5497,6 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
pOperator->status = OP_RES_TO_RETURN;
// pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
int64_t st = taosGetTimestampUs();
......@@ -5882,7 +5870,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
pTQueryInfo->uid = pk->uid;
// pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey;
// pTQueryInfo->groupIndex = i;
}
......
......@@ -205,7 +205,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
// }
// this function never returns error?
uint32_t status = FUNC_DATA_REQUIRED_NOT_LOAD;
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
......@@ -256,9 +256,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN;
if (pResultRowInfo->size > 0) {
pResultRowInfo->curPos = 0;
}
// if (pResultRowInfo->size > 0) {
// pResultRowInfo->curPos = 0;
// }
qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
......@@ -275,7 +275,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
if (pResultRowInfo->size > 0) {
pResultRowInfo->curPos = pResultRowInfo->size - 1;
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
}
p = doTableScanImpl(pOperator, newgroup);
......
......@@ -30,6 +30,7 @@ EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWind
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t countFunction(SqlFunctionCtx *pCtx);
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t sumFunction(SqlFunctionCtx *pCtx);
......
......@@ -401,6 +401,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_SUM,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateSum,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getSumFuncEnv,
.initFunc = functionSetup,
.processFunc = sumFunction,
......@@ -411,6 +412,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_MIN,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateInOutNum,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = minFunctionSetup,
.processFunc = minFunction,
......@@ -421,6 +423,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_MAX,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateInOutNum,
.dataRequiredFunc = statisDataRequired,
.getEnvFunc = getMinmaxFuncEnv,
.initFunc = maxFunctionSetup,
.processFunc = maxFunction,
......
......@@ -183,6 +183,10 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
return FUNC_DATA_REQUIRED_STATIS_LOAD;
}
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册