提交 4c04ce09 编写于 作者: H Haojun Liao

[TD-2434]<enhance>: reduce the memory requirement during super table interval query.

上级 79ffafb3
......@@ -388,10 +388,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
return;
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
return;
}
......
......@@ -2597,14 +2597,23 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
}
//////////////////////////////////////////////////////////////////////////////////
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
}
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo* pInfo = NULL;
if (pCtx->stableQuery && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (SAPercentileInfo*) pCtx->aOutputBuf;
pInfo = (SAPercentileInfo*) pCtx->aOutputBuf;
} else {
return GET_ROWCELL_INTERBUF(pResInfo);
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
}
buildHistogramInfo(pInfo);
return pInfo;
}
static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
......@@ -2616,6 +2625,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems);
return true;
}
......@@ -2624,6 +2634,8 @@ static void apercentile_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx);
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
assert(pInfo->pHisto->elems != NULL);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
......
......@@ -33,13 +33,6 @@ struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, char* val1, char* val2);
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
typedef struct SGroupResInfo {
int32_t groupId;
int32_t numOfDataPages;
int32_t pageId;
int32_t rowId;
} SGroupResInfo;
typedef struct SResultRowPool {
int32_t elemSize;
int32_t blockSize;
......@@ -72,6 +65,12 @@ typedef struct SResultRow {
union {STimeWindow win; char* key;}; // start key of current time window
} SResultRow;
typedef struct SGroupResInfo {
int32_t rowId;
int32_t index;
SArray* pRows; // SArray<SResultRow*>
} SGroupResInfo;
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
......@@ -89,7 +88,6 @@ typedef struct SResultRowInfo {
int32_t size:24; // number of result set
int32_t capacity; // max capacity
int32_t curIndex; // current start active index
int64_t startTime; // start time of the first time window for sliding query
int64_t prevSKey; // previous (not completed) sliding window start key
} SResultRowInfo;
......
......@@ -67,7 +67,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto);
void tHistogramPrint(SHistogramInfo* pHisto);
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val);
//int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHeapEntry* tHeapCreate(int32_t numOfEntries);
void tHeapSort(SHeapEntry* pEntry, int32_t len);
......
......@@ -77,7 +77,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p);
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv);
bool isPointInterpoQuery(SQuery *pQuery);
......
......@@ -178,11 +178,10 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
#define IS_STASBLE_QUERY_OVER(_q) ((_q)->tableIndex >= (int32_t)((_q)->tableqinfoGroupInfo.numOfTables))
// todo move to utility
static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *group);
static int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableList, SQInfo* pQInfo);
static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
......@@ -195,7 +194,6 @@ static bool hasMainOutput(SQuery *pQuery);
static void buildTagQueryResult(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo);
static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo);
static int32_t checkForQueryBuf(size_t numOfTables);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
......@@ -291,7 +289,7 @@ void updateNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfRes) {
}
}
static int32_t getMergeResultGroupId(int32_t groupIndex) {
static UNUSED_FUNC int32_t getMergeResultGroupId(int32_t groupIndex) {
int32_t base = 50000000;
return base + (groupIndex * 10000);
}
......@@ -466,16 +464,34 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
int32_t *p1 =
(int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
if (p1 != NULL) {
pResultRowInfo->curIndex = *p1;
SResultRow **p1 =
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
// in case of repeat scan/reverse scan, no new time window added.
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
return (p1 != NULL)? *p1:NULL;
}
if (p1 != NULL) {
for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) {
if (pResultRowInfo->pResult[i] == (*p1)) {
pResultRowInfo->curIndex = i;
existed = true;
break;
}
}
}
} else {
if (!masterscan) { // not master scan, do not add new timewindow
return NULL;
if (p1 != NULL) { // group by column query
return *p1;
}
}
if (!existed) {
// TODO refactor
// more than the capacity, reallocate the resources
if (pResultRowInfo->size >= pResultRowInfo->capacity) {
......@@ -499,17 +515,23 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
pResultRowInfo->capacity = (int32_t)newCapacity;
}
SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool);
pResultRowInfo->pResult[pResultRowInfo->size] = pResult;
int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
SResultRow *pResult = NULL;
if (p1 == NULL) {
pResult = getNewResultRow(pRuntimeEnv->pool);
int32_t ret = initResultRow(pResult);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// add a new result set for a new group
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES);
} else {
pResult = *p1;
}
// add a new result set for a new group
pResultRowInfo->pResult[pResultRowInfo->size] = pResult;
pResultRowInfo->curIndex = pResultRowInfo->size++;
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes),
(char *)&pResultRowInfo->curIndex, sizeof(int32_t));
}
// too many time window in query
......@@ -591,7 +613,6 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
if (pData->num >= numOfRowsPerPage) {
// release current page first, and prepare the next one
releaseResBufPageInfo(pResultBuf, pi);
pData = getNewDataBuf(pResultBuf, tid, &pageId);
if (pData != NULL) {
assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
......@@ -614,24 +635,20 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0;
}
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SDataBlockInfo* pBockInfo,
STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) {
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
bool masterscan, SResultRow** pResult, int64_t groupId) {
assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// todo refactor
int64_t uid = getResultInfoUId(pRuntimeEnv);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
if (pResultRow == NULL) {
*newWind = false;
return masterscan? -1:0; // no master scan, no result generated means error occurs
*pResult = NULL;
return TSDB_CODE_SUCCESS;
}
*newWind = true;
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, pBockInfo->tid, pRuntimeEnv->numOfRowsPerPage);
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupId, pRuntimeEnv->numOfRowsPerPage);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
......@@ -701,81 +718,47 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep;
}
static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY lastKey, bool ascQuery) {
int32_t i = 0;
static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, bool ascQuery) {
int64_t skey = TSKEY_INITIAL_VAL;
int32_t numOfClosed = 0;
for (i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i];
int32_t i = 0;
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow *pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
numOfClosed += 1;
continue;
break;
}
TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeResultRow(pWindowResInfo, i);
// new closed result rows
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeResultRow(pResultRowInfo, i);
} else {
skey = pResult->win.skey;
break;
}
}
// all windows are closed, set the last one to be the skey
// all result rows are closed, set the last one to be the skey
if (skey == TSKEY_INITIAL_VAL) {
assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
} else {
pWindowResInfo->curIndex = i;
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
}
return numOfClosed;
}
/**
* NOTE: the query status only set for the first scan of master scan.
*/
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->scanFlag != MASTER_SCAN || pWindowResInfo->size == 0) {
return pWindowResInfo->size;
}
// no qualified results exist, abort check
int32_t numOfClosed = 0;
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// query completed
if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) {
closeAllResultRows(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window
numOfClosed = updateResultRowCurrentIndex(pWindowResInfo, lastKey, ascQuery);
// the number of completed slots are larger than the threshold, return current generated results to client.
if (numOfClosed > pQuery->rec.threshold) {
qDebug("QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return",
GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, numOfClosed, pQuery->rec.threshold);
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
} else {
qDebug("QInfo:%p total result window:%d already closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size,
numOfClosed);
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
SResultRow *pResult = pResultRowInfo->pResult[i];
if (pResult->closed) {
break;
}
}
}
// output has reached the limitation, set query completed
if (pQuery->limit.limit > 0 && (pQuery->limit.limit + pQuery->limit.offset) <= numOfClosed &&
pRuntimeEnv->scanFlag == MASTER_SCAN) {
setQueryStatus(pQuery, QUERY_COMPLETED);
pResultRowInfo->curIndex = i + 1; // current not closed result object
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
}
}
assert(pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL);
return numOfClosed;
static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery) {
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo);
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
} else {
doUpdateResultRowIndex(pResultRowInfo, pTableQueryInfo->lastKey, ascQuery);
}
}
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
......@@ -818,52 +801,47 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return num;
}
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset,
int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) {
SQuery * pQuery = pRuntimeEnv->pQuery;
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool hasPrev = pCtx[0].preAggVals.isSet;
if (IS_MASTER_SCAN(pRuntimeEnv) || closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsCol[pCtx[k].startOffset];
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx[k].ptsList = &tsCol[pCtx[k].startOffset];
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
// restore it
pCtx[k].preAggVals.isSet = hasPrev;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
// restore it
pCtx[k].preAggVals.isSet = hasPrev;
}
}
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset) {
SQuery * pQuery = pRuntimeEnv->pQuery;
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
if (IS_MASTER_SCAN(pRuntimeEnv) || closed) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
......@@ -1178,7 +1156,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
TSKEY *tsCols = NULL;
if (pDataBlock != NULL) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
......@@ -1203,56 +1183,46 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
if (ret != TSDB_CODE_SUCCESS) {
tfree(sasArray);
return;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
goto _end;
}
int32_t forwardStep = 0;
int32_t startPos = pQuery->pos;
// in case of repeat scan/reverse scan, no new time window added.
if (hasTimeWindow) {
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) {
for(int32_t j = prevIndex; j < curIndex; ++j) {
SResultRow *pRes = pWindowResInfo->pResult[j];
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &w, masterScan, &hasTimeWindow, &pResult);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex && pRuntimeEnv->timeWindowInterpo) {
for(int32_t j = prevIndex; j < curIndex; ++j) {
SResultRow *pRes = pWindowResInfo->pResult[j];
int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, closed, &w, startPos, 0, tsCols, pDataBlockInfo->rows);
}
int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
assert (ret == TSDB_CODE_SUCCESS); // null data, too many state code
doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows);
}
// window start key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep);
bool pStatus = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
assert (ret == TSDB_CODE_SUCCESS);
}
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
// window start key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pRuntimeEnv, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos);
......@@ -1261,27 +1231,19 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) !=
TSDB_CODE_SUCCESS) {
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
if (!hasTimeWindow) {
continue;
}
TSKEY ekey = reviseWindowEkey(pQuery, &nextWin);
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep);
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doBlockwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows);
}
pWindowResInfo->curIndex = index;
} else {
/*
* the sqlfunctionCtx parameters should be set done before all functions are invoked,
......@@ -1297,6 +1259,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
}
_end:
if (pRuntimeEnv->timeWindowInterpo) {
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock);
}
......@@ -1317,8 +1280,6 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
return -1;
}
int32_t GROUPRESULTID = 1;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// not assign result buffer yet, add new result buffer, TODO remove it
......@@ -1334,11 +1295,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
uint64_t uid = groupIndex;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid);
if (pResultRow == NULL) {
return -1;
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, groupIndex);
assert (pResultRow != NULL);
int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData);
......@@ -1355,7 +1313,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
}
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage);
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage);
if (ret != 0) {
return -1;
}
......@@ -1490,7 +1448,7 @@ void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDa
double v1 = 0, v2 = 0, v = 0;
if (prevRowIndex == -1) {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[k]);
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pRuntimeEnv->prevRow[index]);
} else {
GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes);
}
......@@ -1557,6 +1515,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
int64_t groupId = item->groupIndex;
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
......@@ -1627,15 +1587,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
if (!hasTimeWindow) {
continue;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // null data, too many state code
goto _end;
}
// window start key interpolation
......@@ -1646,18 +1601,15 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
for (int32_t k = prevWindowIndex; k < curIndex; ++k) {
SResultRow *pRes = pWindowResInfo->pResult[k];
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &pRes->win, masterScan, &hasTimeWindow, &pResult);
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
setTimeWindowEKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &pRes->win);
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &pRes->win, offset);
doRowwiseApplyFunctions(pRuntimeEnv, &pRes->win, offset);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow,
&pResult);
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
......@@ -1666,8 +1618,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &win);
}
bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset);
doRowwiseApplyFunctions(pRuntimeEnv, &win, offset);
STimeWindow nextWin = win;
int32_t index = pWindowResInfo->curIndex;
......@@ -1684,16 +1635,13 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
// null data, failed to allocate more memory buffer
hasTimeWindow = false;
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) {
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
if (hasTimeWindow) {
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin);
closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo));
doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset);
}
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin);
doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset);
}
pWindowResInfo->curIndex = index;
......@@ -1728,6 +1676,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
}
_end:
assert(offset >= 0);
if (tsCols != NULL) {
item->lastKey = tsCols[offset] + step;
......@@ -1755,28 +1704,26 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQInfo = pQuery->current;
SResultRowInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo* pResultRowInfo = &pRuntimeEnv->windowResInfo;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
}
// update the lastkey of current table
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied
int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else if (pRuntimeEnv->groupbyNormalCol) {
closeAllResultRows(pWindowResInfo);
numOfRes = pWindowResInfo->size;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
numOfRes = pResultRowInfo->size;
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery));
} else { // projection query
numOfRes = (int32_t)getNumOfResult(pRuntimeEnv);
numOfRes = (int32_t) getNumOfResult(pRuntimeEnv);
// update the number of output result
if (numOfRes > 0 && pQuery->checkBuffer == 1) {
......@@ -1791,8 +1738,8 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
setQueryStatus(pQuery, QUERY_COMPLETED);
}
if (((pTableQInfo->lastKey > pTableQInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pTableQInfo->lastKey < pTableQInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) {
if (((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
}
......@@ -2610,6 +2557,8 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
*status = BLK_DATA_NO_NEEDED;
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
SQueryCostInfo* pCost = &pRuntimeEnv->summary;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) {
......@@ -2626,15 +2575,13 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool hasTimeWindow = false;
SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow, &pResult) !=
TSDB_CODE_SUCCESS) {
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
}
......@@ -2832,13 +2779,9 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo
if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
} else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
} else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, pQuery->window.ekey, pBlockInfo->window.ekey, &w);
pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
}
......@@ -2908,13 +2851,9 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
closeAllResultRows(&pRuntimeEnv->windowResInfo);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
} else {
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
closeAllResultRows(&pRuntimeEnv->windowResInfo);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
}
return 0;
......@@ -3019,7 +2958,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
}
}
static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) {
static UNUSED_FUNC void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SResultRow *pWindowRes, bool mergeFlag) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
......@@ -3168,19 +3107,18 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
typedef struct SCompSupporter {
STableQueryInfo **pTableQueryInfo;
int32_t * position;
SQInfo * pQInfo;
int32_t *rowIndex;
int32_t order;
} SCompSupporter;
int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t left = *(int32_t *)pLeft;
int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight;
SCompSupporter * supporter = (SCompSupporter *)param;
SQueryRuntimeEnv *pRuntimeEnv = &supporter->pQInfo->runtimeEnv;
int32_t leftPos = supporter->position[left];
int32_t rightPos = supporter->position[right];
int32_t leftPos = supporter->rowIndex[left];
int32_t rightPos = supporter->rowIndex[right];
/* left source is exhausted */
if (leftPos == -1) {
......@@ -3192,53 +3130,55 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
return -1;
}
SResultRowInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
STableQueryInfo** pList = supporter->pTableQueryInfo;
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
TSKEY leftTimestamp = GET_INT64_VAL(b1);
SResultRowInfo *pWindowResInfo1 = &(pList[left]->windowResInfo);
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
TSKEY leftTimestamp = pWindowRes1->win.skey;
SResultRowInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
SResultRowInfo *pWindowResInfo2 = &(pList[right]->windowResInfo);
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
char *b2 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes2, page2);
TSKEY rightTimestamp = GET_INT64_VAL(b2);
TSKEY rightTimestamp = pWindowRes2->win.skey;
if (leftTimestamp == rightTimestamp) {
return 0;
}
return leftTimestamp > rightTimestamp ? 1 : -1;
if (supporter->order == TSDB_ORDER_ASC) {
return (leftTimestamp > rightTimestamp)? 1:-1;
} else {
return (leftTimestamp < rightTimestamp)? 1:-1;
}
}
int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
int32_t mergeGroupResult(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs();
int32_t ret = TSDB_CODE_SUCCESS;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
while (pQInfo->groupIndex < numOfGroups) {
SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
ret = mergeIntoGroupResultImpl(pQInfo, group);
if (ret < 0) { // not enough disk space to save the data into disk
int32_t ret = mergeIntoGroupResultImpl(pGroupResInfo, group, pQInfo);
if (ret < 0) {
return -1;
}
pQInfo->groupIndex += 1;
// this group generates at least one result, return results
if (ret > 0) {
pQInfo->groupIndex += 1;
if (taosArrayGetSize(pGroupResInfo->pRows) > 0) {
break;
}
assert(pQInfo->groupResInfo.numOfDataPages == 0);
qDebug("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1);
taosArrayClear(pGroupResInfo->pRows);
pGroupResInfo->index = 0;
pGroupResInfo->rowId = 0;
}
SGroupResInfo* info = &pQInfo->groupResInfo;
if (pQInfo->groupIndex == numOfGroups && info->pageId == info->numOfDataPages) {
if (pQInfo->groupIndex == numOfGroups && taosArrayGetSize(pGroupResInfo->pRows) == 0) {
SET_STABLE_QUERY_OVER(pQInfo);
}
......@@ -3250,89 +3190,28 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t doCopyToSData(SQInfo *pQInfo, SResultRow **pRows, int32_t numOfRows, int32_t* index, int32_t orderType);
void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
// all results have been return to client, try next group
if (pGroupResInfo->pageId == pGroupResInfo->numOfDataPages) {
pGroupResInfo->numOfDataPages = 0;
pGroupResInfo->pageId = 0;
pGroupResInfo->rowId = 0;
// all results in current group have been returned to client, try next group
if (pGroupResInfo->index >= taosArrayGetSize(pGroupResInfo->pRows)) {
// current results of group has been sent to client, try next group
if (mergeIntoGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
if (mergeGroupResult(pQInfo) != TSDB_CODE_SUCCESS) {
return; // failed to save data in the disk
}
// check if all results has been sent to client
int32_t numOfGroup = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
if (pGroupResInfo->numOfDataPages == 0 && pQInfo->groupIndex == numOfGroup) {
if (taosArrayGetSize(pGroupResInfo->pRows) == 0 && pQInfo->groupIndex == numOfGroup) {
SET_STABLE_QUERY_OVER(pQInfo);
return;
}
}
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t id = pQInfo->groupResInfo.groupId;
SIDList list = getDataBufPagesIdList(pResultBuf, id);
int32_t offset = 0;
int32_t numOfCopiedRows = 0;
size_t size = taosArrayGetSize(list);
assert(size == pGroupResInfo->numOfDataPages);
bool done = false;
//TODO add API for release none-dirty pages
// SPageInfo* prev = NULL;
for (int32_t j = pGroupResInfo->pageId; j < size; ++j) {
SPageInfo* pi = *(SPageInfo**) taosArrayGet(list, j);
tFilePage* pData = getResBufPage(pResultBuf, pi->pageId);
// release previous buffer pages
// if (prev == NULL) {
// prev = pi;
// } else {
// if (prev->pageId != pi->pageId) {
// releaseResBufPageInfo(pResultBuf, prev);
// prev = pi;
// }
// }
assert(pData->num > 0 && pData->num <= pRuntimeEnv->numOfRowsPerPage && pGroupResInfo->rowId < pData->num);
int32_t numOfRes = (int32_t)(pData->num - pGroupResInfo->rowId);
if (numOfRes > pQuery->rec.capacity - offset) {
numOfCopiedRows = (int32_t)(pQuery->rec.capacity - offset);
pGroupResInfo->rowId += numOfCopiedRows;
done = true;
} else {
numOfCopiedRows = (int32_t)pData->num;
pGroupResInfo->pageId += 1;
pGroupResInfo->rowId = 0;
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
char * pDest = pQuery->sdata[i]->data;
memcpy(pDest + offset * bytes, pData->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage,
(size_t)bytes * numOfCopiedRows);
}
offset += numOfCopiedRows;
if (done) {
break;
}
}
assert(pQuery->rec.rows == 0);
pQuery->rec.rows += offset;
int32_t size = (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
pQuery->rec.rows = doCopyToSData(pQInfo, pGroupResInfo->pRows->pData, (int32_t) size, &pGroupResInfo->index, TSDB_ORDER_ASC);
}
int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow) {
......@@ -3360,155 +3239,99 @@ int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResu
return 0;
}
int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
int32_t mergeIntoGroupResultImpl(SGroupResInfo* pGroupResInfo, SArray *pTableList, SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQuery);
int32_t code = TSDB_CODE_SUCCESS;
size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = pQuery->sdata;
int32_t *posList = NULL;
SLoserTreeInfo *pTree = NULL;
STableQueryInfo **pTableQueryInfoList = NULL;
int32_t *posList = calloc(size, sizeof(int32_t));
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
size_t size = taosArrayGetSize(pTableList);
if (pGroupResInfo->pRows == NULL) {
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
}
if (pTableList == NULL || posList == NULL) {
tfree(posList);
tfree(pTableList);
posList = calloc(size, sizeof(int32_t));
pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL) {
qError("QInfo:%p failed alloc memory", pQInfo);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
// todo opt for the case of one table per group
int32_t numOfTables = 0;
SIDList pageList = NULL;
int32_t tid = -1;
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pGroup, i);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables++] = item;
tid = TSDB_TABLEID(item->pTable)->tid;
pageList = list;
STableQueryInfo *item = taosArrayGetP(pTableList, i);
if (item->windowResInfo.size > 0) {
pTableQueryInfoList[numOfTables++] = item;
}
}
// there is no data in current group
// no need to merge results since only one table in each group
if (numOfTables == 0) {
tfree(posList);
tfree(pTableList);
return 0;
} else if (numOfTables == 1) { // no need to merge results since only one table in each group
tfree(posList);
tfree(pTableList);
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
pGroupResInfo->numOfDataPages = (int32_t)taosArrayGetSize(pageList);
pGroupResInfo->groupId = tid;
pGroupResInfo->pageId = 0;
pGroupResInfo->rowId = 0;
return pGroupResInfo->numOfDataPages;
goto _end;
}
SCompSupporter cs = {pTableList, posList, pQInfo};
SLoserTreeInfo *pTree = NULL;
tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQuery->order.order};
SResultRow* pRow = getNewResultRow(pRuntimeEnv->pool);
resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow);
pQInfo->groupResInfo.groupId = getMergeResultGroupId(pQInfo->groupIndex);
int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
if (ret != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end;
}
// todo add windowRes iterator
int64_t lastTimestamp = -1;
int64_t lastTimestamp = ascQuery? INT64_MIN:INT64_MAX;
int64_t startt = taosGetTimestampMs();
while (1) {
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo);
tfree(pTableList);
tfree(posList);
tfree(pTree);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
code = TSDB_CODE_TSC_QUERY_CANCELLED;
goto _end;
}
int32_t pos = pTree->pNode[0].index;
SResultRowInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
int32_t tableIndex = pTree->pNode[0].index;
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page);
TSKEY ts = GET_INT64_VAL(b);
SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->windowResInfo;
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]);
assert(ts == pWindowRes->win.skey);
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes);
if (num <= 0) {
cs.position[pos] += 1;
cs.rowIndex[tableIndex] += 1;
if (cs.position[pos] >= pWindowResInfo->size) {
cs.position[pos] = -1;
// all input sources are exhausted
if (--numOfTables == 0) {
if (cs.rowIndex[tableIndex] >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
if (--numOfTables == 0) { // all input sources are exhausted
break;
}
}
} else {
if (ts == lastTimestamp) { // merge with the last one
doMerge(pRuntimeEnv, ts, pWindowRes, true);
} else { // copy data to disk buffer
if (buffer[0]->num == pQuery->rec.capacity) {
if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) {
return -1;
}
resetMergeResultBuf(pRuntimeEnv, pRuntimeEnv->pCtx, pRow);
}
assert((pWindowRes->win.skey >= lastTimestamp && ascQuery) || (pWindowRes->win.skey <= lastTimestamp && !ascQuery));
doMerge(pRuntimeEnv, ts, pWindowRes, false);
buffer[0]->num += 1;
if (pWindowRes->win.skey != lastTimestamp) {
taosArrayPush(pGroupResInfo->pRows, &pWindowRes);
pWindowRes->numOfRows = num;
}
lastTimestamp = ts;
lastTimestamp = pWindowRes->win.skey;
// move to the next element of current entry
int32_t currentPageId = pWindowRes->pageId;
cs.position[pos] += 1;
if (cs.position[pos] >= pWindowResInfo->size) {
cs.position[pos] = -1;
// move to the next row of current entry
if ((++cs.rowIndex[tableIndex]) >= pWindowResInfo->size) {
cs.rowIndex[tableIndex] = -1;
// all input sources are exhausted
if (--numOfTables == 0) {
if ((--numOfTables) == 0) {
break;
}
} else {
// current page is not needed anymore
SResultRow *pNextWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
if (pNextWindowRes->pageId != currentPageId) {
releaseResBufPage(pRuntimeEnv->pResultBuf, page);
}
}
}
tLoserTreeAdjust(pTree, pos + pTree->numOfEntries);
}
if (buffer[0]->num != 0) { // there are data in buffer
if (flushFromResultBuf(pRuntimeEnv, &pQInfo->groupResInfo) != TSDB_CODE_SUCCESS) {
qError("QInfo:%p failed to flush data into temp file, abort query", pQInfo);
tfree(pTree);
tfree(pTableList);
tfree(posList);
return -1;
}
tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries);
}
int64_t endt = taosGetTimestampMs();
......@@ -3519,65 +3342,16 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
qDebug("QInfo:%p result merge completed for group:%d, elapsed time:%" PRId64 " ms", pQInfo, pQInfo->groupIndex, endt - startt);
tfree(pTableList);
_end:
tfree(pTableQueryInfoList);
tfree(posList);
tfree(pTree);
// tfree(pResultInfo);
// tfree(buf);
return pQInfo->groupResInfo.numOfDataPages;
}
int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
// the base value for group result, since the maximum number of table for each vnode will not exceed 100,000.
int32_t pageId = -1;
int32_t capacity = pResultBuf->numOfRowsPerPage;
int32_t remain = (int32_t) pQuery->sdata[0]->num;
int32_t offset = 0;
while (remain > 0) {
int32_t rows = (remain > capacity)? capacity:remain;
assert(rows > 0);
// get the output buffer page
tFilePage *buf = getNewDataBuf(pResultBuf, pGroupResInfo->groupId, &pageId);
buf->num = rows;
// pagewisely copy to dest buffer
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
char* output = buf->data + pRuntimeEnv->offset[i] * pRuntimeEnv->numOfRowsPerPage;
char* src = ((char *) pQuery->sdata[i]->data) + offset * bytes;
memcpy(output, src, (size_t)(buf->num * bytes));
}
offset += rows;
remain -= rows;
pGroupResInfo->numOfDataPages += 1;
if (code != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, code);
}
return TSDB_CODE_SUCCESS;
}
void resetMergeResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx *pCtx, SResultRow *pRow) {
SQuery* pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].aOutputBuf = pQuery->sdata[k]->data - pCtx[k].outputBytes;
pCtx[k].size = 1;
pCtx[k].startOffset = 0;
pCtx[k].resultInfo = getResultCell(pRuntimeEnv, pRow, k);
pQuery->sdata[k]->num = 0;
}
return code;
}
static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *pTableQueryInfo) {
......@@ -3698,7 +3472,7 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t tid = 0;
int64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
......@@ -3830,12 +3604,8 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = getResultRow(pWindowResInfo, i);
if (!pResult->closed) {
continue;
}
setResultOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pExpr1[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
......@@ -4119,9 +3889,9 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
......@@ -4134,12 +3904,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
return;
}
uint64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true, uid);
if (pResultRow == NULL) {
return;
}
assert (pResultRow != NULL);
/*
* not assign result buffer yet, add new result buffer
......@@ -4285,7 +4053,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
/**
* In handling the both ascending and descending order super table query, we need to find the first qualified
* timestamp of this table, and then set the first qualified start timestamp.
* In ascending query, key is the first qualified timestamp. However, in the descending order query, additional
* In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional
* operations involve.
*/
STimeWindow w = TSWINDOW_INITIALIZER;
......@@ -4294,7 +4062,6 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
TSKEY sk = MIN(win.skey, win.ekey);
TSKEY ek = MAX(win.skey, win.ekey);
getAlignQueryTimeWindow(pQuery, win.skey, sk, ek, &w);
pWindowResInfo->startTime = pTableQueryInfo->win.skey; // windowSKey may be 0 in case of 1970 timestamp
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
if (!QUERY_IS_ASC_QUERY(pQuery)) {
......@@ -4333,36 +4100,33 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
return loadPrimaryTS;
}
static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_t orderType) {
static int32_t doCopyToSData(SQInfo *pQInfo, SResultRow **pRows, int32_t numOfRows, int32_t *index, int32_t orderType) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
int32_t numOfResult = 0;
int32_t startIdx = 0;
int32_t start = 0;
int32_t step = -1;
qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo);
int32_t totalSet = numOfClosedResultRows(pResultInfo);
SResultRow** result = pResultInfo->pResult;
if (orderType == TSDB_ORDER_ASC) {
startIdx = pQInfo->groupIndex;
start = (*index);
step = 1;
} else { // desc order copy all data
startIdx = totalSet - pQInfo->groupIndex - 1;
start = numOfRows - (*index) - 1;
step = -1;
}
SGroupResInfo* pGroupResInfo = &pQInfo->groupResInfo;
for (int32_t i = startIdx; (i < totalSet) && (i >= 0); i += step) {
if (result[i]->numOfRows == 0) {
pQInfo->groupIndex += 1;
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
if (pRows[i]->numOfRows == 0) {
(*index) += 1;
pGroupResInfo->rowId = 0;
continue;
}
int32_t numOfRowsToCopy = result[i]->numOfRows - pGroupResInfo->rowId;
int32_t numOfRowsToCopy = pRows[i]->numOfRows - pGroupResInfo->rowId;
int32_t oldOffset = pGroupResInfo->rowId;
/*
......@@ -4374,16 +4138,16 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_
pGroupResInfo->rowId += numOfRowsToCopy;
} else {
pGroupResInfo->rowId = 0;
pQInfo->groupIndex += 1;
(*index) += 1;
}
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, result[i]->pageId);
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRows[i]->pageId);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
char *out = pQuery->sdata[j]->data + numOfResult * size;
char *in = getPosInResultPage(pRuntimeEnv, j, result[i], page);
char *in = getPosInResultPage(pRuntimeEnv, j, pRows[i], page);
memcpy(out, in + oldOffset * size, size * numOfRowsToCopy);
}
......@@ -4414,10 +4178,9 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo, orderType);
int32_t numOfResult = doCopyToSData(pQInfo, pResultInfo->pResult, pResultInfo->size, &pQInfo->groupIndex, orderType);
pQuery->rec.rows += numOfResult;
assert(pQuery->rec.rows <= pQuery->rec.capacity);
}
......@@ -4449,25 +4212,17 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
SResultRowInfo * pResultRowInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// TODO refactor
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllResultRows(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery);
}
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery));
}
}
......@@ -4801,13 +4556,10 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
getAlignQueryTimeWindow(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &w);
pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey;
}
} else {
getAlignQueryTimeWindow(pQuery, blockInfo.window.ekey, pQuery->window.ekey, blockInfo.window.ekey, &w);
pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey;
}
......@@ -5758,6 +5510,7 @@ static void doRestoreContext(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
if (pRuntimeEnv->pTsBuf != NULL) {
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
......@@ -5797,9 +5550,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
*/
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif
} else {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
}
......@@ -5844,7 +5594,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
}
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
if (mergeGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW
......@@ -6016,33 +5766,6 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
}
}
static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQuery *pQuery = pRuntimeEnv->pQuery;
while (1) {
scanOneTableDataBlocks(pRuntimeEnv, start);
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
finalizeQueryResult(pRuntimeEnv);
// here we can ignore the records in case of no interpolation
// todo handle offset, in case of top/bottom interval query
if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) && pQuery->limit.offset > 0 &&
pQuery->fillType == TSDB_FILL_NONE) {
// maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset));
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, c);
pQuery->limit.offset -= c;
}
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_RESBUF_FULL)) {
break;
}
}
}
// handle time interval query on table
static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
......@@ -6050,7 +5773,6 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo;
int32_t numOfFilled = 0;
TSKEY newStartKey = TSKEY_INITIAL_VAL;
// skip blocks without load the actual data block from file if no filter condition present
......@@ -6062,59 +5784,39 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
}
while (1) {
tableIntervalProcessImpl(pRuntimeEnv, newStartKey);
scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pQInfo->groupIndex = 0; // always start from 0
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
finalizeQueryResult(pRuntimeEnv);
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
}
// skip offset result rows
pQuery->rec.rows = 0;
// no result generated, abort
if (pQuery->rec.rows == 0 || pRuntimeEnv->groupbyNormalCol) {
break;
if (pQuery->fillType == TSDB_FILL_NONE) {
// all data scanned, the group by normal column can return
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
if (pQuery->limit.offset > numOfClosed) {
return;
}
doSecondaryArithmeticProcess(pQuery);
// the offset is handled at prepare stage if no interpolation involved
if (pQuery->fillType == TSDB_FILL_NONE) {
limitResults(pRuntimeEnv);
break;
} else {
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, pQuery->window.ekey);
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage**) pQuery->sdata);
numOfFilled = 0;
pQInfo->groupIndex = pQuery->limit.offset;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv);
break;
}
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
doSecondaryArithmeticProcess(pQuery);
// no result generated yet, continue retrieve data
pQuery->rec.rows = 0;
}
}
limitResults(pRuntimeEnv);
} else {
// all data scanned, the group by normal column can return
if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
// maxOutput <= 0, means current query does not generate any results
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
doSecondaryArithmeticProcess(pQuery);
if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) {
// skip offset result rows
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (int32_t) pQuery->limit.offset);
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, pQuery->window.ekey);
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata);
pQuery->rec.rows = 0;
pQInfo->groupIndex = 0;
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
int32_t numOfFilled = 0;
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
doSecondaryArithmeticProcess(pQuery);
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
limitResults(pRuntimeEnv);
}
}
......@@ -6125,7 +5827,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
if (queryHasRemainResForTableQuery(pRuntimeEnv)) {
if (pQuery->fillType != TSDB_FILL_NONE) {
/*
* There are remain results that are not returned due to result interpolation
......@@ -6142,23 +5843,23 @@ static void tableQueryImpl(SQInfo *pQInfo) {
return;
} else {
pQuery->rec.rows = 0;
pQInfo->groupIndex = 0; // always start from 0
assert(pRuntimeEnv->windowResInfo.size > 0);
if (pRuntimeEnv->windowResInfo.size > 0) {
if (pQInfo->groupIndex < pRuntimeEnv->windowResInfo.size) {
copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo);
popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex);
if (pQuery->rec.rows > 0) {
qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
}
// there are not data remains
if (pRuntimeEnv->windowResInfo.size <= 0) {
qDebug("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
}
if (pQuery->rec.rows > 0) {
qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pQuery->rec.rows,
pQuery->rec.total);
}
return;
}
// there are not data remains
if (pQuery->rec.rows <= 0 || pRuntimeEnv->windowResInfo.size <= pQInfo->groupIndex) {
qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total);
}
return;
}
}
......@@ -7276,6 +6977,8 @@ static void freeQInfo(SQInfo *pQInfo) {
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
taosHashCleanup(pQInfo->arrTableIdInfo);
taosArrayDestroy(pQInfo->groupResInfo.pRows);
pQInfo->signature = 0;
qDebug("QInfo:%p QInfo is freed", pQInfo);
......@@ -7695,7 +7398,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(*pRsp)->completed = 1; // notify no more result to client
} else {
*continueExec = true;
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
qDebug("QInfo:%p has more results to retrieve", pQInfo);
}
return pQInfo->code;
......@@ -8060,6 +7763,4 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0;
}
}
\ No newline at end of file
......@@ -120,6 +120,7 @@
//}
static int32_t histogramCreateBin(SHistogramInfo* pHisto, int32_t index, double val);
static int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val);
SHistogramInfo* tHistogramCreate(int32_t numOfEntries) {
/* need one redundant slot */
......@@ -158,8 +159,8 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
}
#if defined(USE_ARRAYLIST)
int32_t idx = vnodeHistobinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries);
int32_t idx = histoBinarySearch((*pHisto)->elems, (*pHisto)->numOfEntries, val);
assert(idx >= 0 && idx <= (*pHisto)->maxEntries && (*pHisto)->elems != NULL);
if ((*pHisto)->elems[idx].val == val && idx >= 0) {
(*pHisto)->elems[idx].num += 1;
......@@ -356,7 +357,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) {
return 0;
}
int32_t vnodeHistobinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val) {
int32_t end = len - 1;
int32_t start = 0;
......@@ -466,7 +467,7 @@ void tHistogramPrint(SHistogramInfo* pHisto) {
*/
int64_t tHistogramSum(SHistogramInfo* pHisto, double v) {
#if defined(USE_ARRAYLIST)
int32_t slotIdx = vnodeHistobinarySearch(pHisto->elems, pHisto->numOfEntries, v);
int32_t slotIdx = histoBinarySearch(pHisto->elems, pHisto->numOfEntries, v);
if (pHisto->elems[slotIdx].val != v) {
slotIdx -= 1;
......
......@@ -96,8 +96,6 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0;
pResultRowInfo->startTime = TSKEY_INITIAL_VAL;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
}
......@@ -110,7 +108,7 @@ void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
assert(num >= 0 && num <= numOfClosed);
int16_t type = pResultRowInfo->type;
int64_t uid = getResultInfoUId(pRuntimeEnv);
int64_t uid = 0;
char *key = NULL;
int16_t bytes = -1;
......@@ -181,11 +179,12 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]->closed) {
SResultRow* pRow = pResultRowInfo->pResult[i];
if (pRow->closed) {
continue;
}
pResultRowInfo->pResult[i]->closed = true;
pRow->closed = true;
}
}
......@@ -383,18 +382,4 @@ void* destroyResultRowPool(SResultRowPool* p) {
tfree(p);
return NULL;
}
uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) {
if (!pRuntimeEnv->stableQuery) {
return 0; // for simple table query, the uid is always set to be 0;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) {
return 0;
}
STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable);
return id->uid;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册