Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b7a27d6d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b7a27d6d
编写于
3月 02, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-2895] refactor.
上级
97bb9585
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
119 addition
and
1387 deletion
+119
-1387
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+119
-1387
未找到文件。
src/query/src/qExecutor.c
浏览文件 @
b7a27d6d
...
...
@@ -993,70 +993,6 @@ static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow)
return
ekey
;
}
#if 0
//todo binary search
static UNUSED_FUNC void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
if (colId == p->info.colId) {
return p->pData;
}
}
return NULL;
}
// todo refactor
static UNUSED_FUNC char *getDataBlock(SQuery* pQuery, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) {
if (pDataBlock == NULL) {
return NULL;
}
char *dataBlock = NULL;
int32_t functionId = pQuery->pExpr1[col].base.functionId;
if (functionId == TSDB_FUNC_ARITHM) {
sas->pArithExpr = &pQuery->pExpr1[col];
sas->offset = (QUERY_IS_ASC_QUERY(pQuery))? pQuery->pos : pQuery->pos - (size - 1);
sas->colList = pQuery->colList;
sas->numOfCols = pQuery->numOfCols;
// here the pQuery->colList and sas->colList are identical
int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo *pColMsg = &pQuery->colList[i];
dataBlock = NULL;
for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor
SColumnInfoData *p = taosArrayGet(pDataBlock, k);
if (pColMsg->colId == p->info.colId) {
dataBlock = p->pData;
break;
}
}
assert(dataBlock != NULL);
sas->data[i] = dataBlock; // start from the offset
}
} else { // other type of query function
SColIndex *pCol = &pQuery->pExpr1[col].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
SColIndex* pColIndex = &pQuery->pExpr1[col].base.colInfo;
SColumnInfoData *p = taosArrayGet(pDataBlock, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
dataBlock = p->pData;
} else {
dataBlock = NULL;
}
}
return dataBlock;
}
#endif
static
void
setNotInterpoWindowKey
(
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
type
)
{
if
(
type
==
RESULT_ROW_START_INTERP
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
...
...
@@ -1491,131 +1427,6 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat
}
}
#if 0
/**
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
* @param pRuntimeEnv
* @param forwardStep
* @param tsCols
* @param pFields
* @param isDiskFileBlock
* @return the incremental number of output value, so it maybe 0 for fixed number of query,
* such as count/min/max etc.
*/
static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SResultRowInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
TSKEY *tsCols = NULL;
if (pDataBlock != NULL) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
tsCols = (TSKEY *)(pColInfo->pData);
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t prevIndex = curTimeWindowIndex(pWindowResInfo);
TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step);
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
goto _end;
}
int32_t forwardStep = 0;
int32_t startPos = pQuery->pos;
TSKEY ekey = reviseWindowEkey(pQuery, &win);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true);
// prev time window not interpolation yet.
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevIndex != -1 && prevIndex < curIndex && pQuery->timeWindowInterpo) {
for(int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow *pRes = pWindowResInfo->pResult[j];
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &w, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
int32_t p = QUERY_IS_ASC_QUERY(pQuery)? 0:pDataBlockInfo->rows-1;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, *(TSKEY*) pRuntimeEnv->prevRow[0], -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doBlockwiseApplyFunctions(pRuntimeEnv, &w, startPos, 0, tsCols, pDataBlockInfo->rows, pDataBlock);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
assert (ret == TSDB_CODE_SUCCESS);
}
// window start key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &win, pQuery->pos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock);
STimeWindow nextWin = win;
while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQuery, &nextWin, pDataBlockInfo, tsCols, searchFn, prevEndPos);
if (startPos < 0) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true);
// window start(end) key interpolation
doWindowBorderInterpolation(pRuntimeEnv, pDataBlockInfo, pDataBlock, pResult, &nextWin, startPos, forwardStep);
doBlockwiseApplyFunctions(pRuntimeEnv, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows, pDataBlock);
}
} else {
/*
* the sqlfunctionCtx parameters should be set done before all functions are invoked,
* since the selectivity + tag_prj query needs all parameters been set done.
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = pQuery->window.skey;
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
_end:
if (pQuery->timeWindowInterpo) {
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->rows-1:0;
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, rowIndex);
}
}
#endif
static
int32_t
setGroupResultOutputBuf_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
int32_t
groupIndex
,
int32_t
*
rowCellInfoOffset
)
{
SDiskbasedResultBuf
*
pResultBuf
=
pRuntimeEnv
->
pResultBuf
;
...
...
@@ -1754,315 +1565,6 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return
true
;
}
#if 0
static UNUSED_FUNC void setTimeWindowSKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) {
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->skey:win->ekey;
if (key == ts) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} else if (prevTs != INT64_MIN && ((QUERY_IS_ASC_QUERY(pQuery) && prevTs < key) || (!QUERY_IS_ASC_QUERY(pQuery) && prevTs > key))) {
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_START_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_END_INTERP);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pRuntimeEnv->pCtx[k].size = 1;
}
} else {
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
}
static UNUSED_FUNC void setTimeWindowEKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY ts, int32_t offset, SResultRow* pResult, STimeWindow* win) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? win->ekey:win->skey;
doRowwiseTimeWindowInterpolation(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, key, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
pRuntimeEnv->pCtx[i].size = 0;
}
}
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SResultRowInfo *pWindowResInfo, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
int64_t groupId = item->groupIndex;
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
bool groupbyColumnValue = pQuery->groupbyColumn;
int16_t type = 0;
int16_t bytes = 0;
char *groupbyColumnData = NULL;
if (groupbyColumnValue) {
groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock);
}
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pQuery, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]);
pCtx[k].size = 1;
}
// set the input column data
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
pFilterInfo->pData = getDataBlockImpl(pDataBlock, pFilterInfo->info.colId);
assert(pFilterInfo->pData != NULL);
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// from top to bottom in desc
// from bottom to top in asc order
if (pRuntimeEnv->pTsBuf != NULL) {
qDebug("QInfo:%p process data rows, numOfRows:%d, query order:%d, ts comp order:%d", pQInfo, pDataBlockInfo->rows,
pQuery->order.order, pRuntimeEnv->pTsBuf->cur.order);
}
int32_t offset = -1;
TSKEY prevTs = *(TSKEY*) pRuntimeEnv->prevRow[0];
int32_t prevRowIndex = -1;
for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) {
offset = GET_COL_DATA_POS(pQuery, j, step);
if (pRuntimeEnv->pTsBuf != NULL) {
int32_t ret = doTSJoinFilter(pRuntimeEnv, offset);
if (ret == TS_JOIN_TAG_NOT_EQUALS) {
break;
} else if (ret == TS_JOIN_TS_NOT_EQUALS) {
continue;
} else {
assert(ret == TS_JOIN_TS_EQUAL);
}
}
if (pQuery->numOfFilterCols > 0 && (!doFilterData(pQuery, offset))) {
continue;
}
// interval window query, decide the time window according to the primary timestamp
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
int32_t prevWindowIndex = curTimeWindowIndex(pWindowResInfo);
int64_t ts = tsCols[offset];
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { // null data, too many state code
goto _end;
}
// window start key interpolation
if (pQuery->timeWindowInterpo) {
// check for the time window end time interpolation
int32_t curIndex = curTimeWindowIndex(pWindowResInfo);
if (prevWindowIndex != -1 && prevWindowIndex < curIndex) {
for (int32_t k = prevWindowIndex; k < curIndex; ++k) {
SResultRow *pRes = pWindowResInfo->pResult[k];
if (pRes->closed) {
assert(resultRowInterpolated(pResult, RESULT_ROW_START_INTERP) && resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
continue;
}
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &pRes->win, masterScan, &pResult, groupId);
assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
setTimeWindowEKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &pRes->win);
doRowwiseApplyFunctions(pRuntimeEnv, &pRes->win, offset);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
continue;
}
}
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &win);
}
doRowwiseApplyFunctions(pRuntimeEnv, &win, offset);
int32_t index = pWindowResInfo->curIndex;
STimeWindow nextWin = win;
while (1) {
getNextTimeWindow(pQuery, &nextWin);
if ((nextWin.skey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(nextWin.ekey < pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
break;
}
if (ts < nextWin.skey || ts > nextWin.ekey) {
break;
}
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &nextWin, masterScan, &pResult, groupId);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
break;
}
setTimeWindowSKeyInterp(pRuntimeEnv, pDataBlock, prevTs, prevRowIndex, ts, offset, pResult, &nextWin);
doRowwiseApplyFunctions(pRuntimeEnv, &nextWin, offset);
}
// restore the index, add the result row will move the index
pWindowResInfo->curIndex = index;
} else { // other queries
// decide which group this rows belongs to according to current state value
char* val = NULL;
if (groupbyColumnValue) {
val = groupbyColumnData + bytes * offset;
if (isNull(val, type)) { // ignore the null value
continue;
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
}
if (pQuery->stabledev) {
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
pRuntimeEnv->pCtx[k].param[0].arr = NULL;
pRuntimeEnv->pCtx[k].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// todo opt perf
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t i = 0; i < numOfGroup; ++i) {
SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, i);
if (memcmp(p->tags, val, bytes) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t f = 0; f < numOfCols; ++f) {
SStddevInterResult *pres = taosArrayGet(p->pResult, f);
if (pres->colId == pQuery->pExpr1[k].base.colInfo.colId) {
pRuntimeEnv->pCtx[k].param[0].arr = pres->pResult;
break;
}
}
}
}
}
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
prevTs = tsCols[offset];
prevRowIndex = offset;
if (pRuntimeEnv->pTsBuf != NULL) {
// if timestamp filter list is empty, quit current query
if (!tsBufNextPos(pRuntimeEnv->pTsBuf)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
break;
}
}
}
_end:
assert(offset >= 0 && tsCols != NULL);
if (prevTs != INT64_MIN && prevTs != *(int64_t*)pRuntimeEnv->prevRow[0]) {
assert(prevRowIndex >= 0);
item->lastKey = prevTs + step;
}
// In case of all rows in current block are not qualified
if (pQuery->timeWindowInterpo && prevRowIndex != -1) {
saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock, prevRowIndex);
}
if (pRuntimeEnv->pTsBuf != NULL) {
item->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
}
}
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo* pResultRowInfo = &pRuntimeEnv->resultRowInfo;
// if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pQuery->groupbyColumn) {
// rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
// } else {
// blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
// }
// update the lastkey of current table for projection/aggregation query
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied
int32_t numOfRes = 0;
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->groupbyColumn) {
numOfRes = pResultRowInfo->size;
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pQuery->timeWindowInterpo);
} else { // projection query
numOfRes = (int32_t) getNumOfResult(pRuntimeEnv);
// update the number of output result
if (numOfRes > 0 && pQuery->checkResultBuf == 1) {
assert(numOfRes >= pRuntimeEnv->resultInfo.rows);
pRuntimeEnv->resultInfo.rows = numOfRes;
if (numOfRes >= pRuntimeEnv->resultInfo.threshold) {
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
if ((pQuery->limit.limit >= 0) && (pQuery->limit.limit + pQuery->limit.offset) <= numOfRes) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
if (((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey) && (!QUERY_IS_ASC_QUERY(pQuery)))) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
}
}
return numOfRes;
}
#endif
void
setBlockStatisInfo
(
SQLFunctionCtx
*
pCtx
,
SSDataBlock
*
pSDataBlock
,
SColIndex
*
pColIndex
)
{
SDataStatis
*
pStatis
=
NULL
;
...
...
@@ -3284,125 +2786,23 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
return
midPos
;
}
/*
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static
void
doSetTagValueInParam
(
void
*
pTable
,
int32_t
tagColId
,
tVariant
*
tag
,
int16_t
type
,
int16_t
bytes
)
{
tVariantDestroy
(
tag
);
if
(
tagColId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
char
*
val
=
tsdbGetTableName
(
pTable
);
assert
(
val
!=
NULL
);
#if 0
static UNUSED_FUNC void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL) {
STimeWindow w = TSWINDOW_INITIALIZER;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) {
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w);
pWindowResInfo->prevSKey = w.skey;
} else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.ekey, pQuery->window.ekey, pBlockInfo->window.ekey, &w);
pWindowResInfo->prevSKey = w.skey;
}
}
}
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQueryCostInfo* summary = &pQInfo->summary;
qDebug("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey,
pQuery->order.order);
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_MASTER_SCAN(pRuntimeEnv)) {
pQuery->numOfCheckedBlocks += 1;
}
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
doSetInitialTimewindow(pRuntimeEnv, &blockInfo);
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
ensureOutputBuffer(pRuntimeEnv, blockInfo.rows);
SDataStatis *pStatis = NULL;
SArray * pDataBlock = NULL;
uint32_t status = 0;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
if (status == BLK_DATA_DISCARD) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
summary->totalRows += blockInfo.rows;
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey);
// while the output buffer is full or limit/offset is applied, query may be paused here
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL | QUERY_COMPLETED)) {
break;
}
}
if (terrno != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, terrno);
}
// if the result buffer is not full, set the query complete
if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
closeAllResultRows(&pRuntimeEnv->resultRowInfo);
pRuntimeEnv->resultRowInfo.curIndex = pRuntimeEnv->resultRowInfo.size - 1; // point to the last time window
}
return 0;
}
#endif
/*
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
*/
static
void
doSetTagValueInParam
(
void
*
pTable
,
int32_t
tagColId
,
tVariant
*
tag
,
int16_t
type
,
int16_t
bytes
)
{
tVariantDestroy
(
tag
);
if
(
tagColId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
char
*
val
=
tsdbGetTableName
(
pTable
);
assert
(
val
!=
NULL
);
tVariantCreateFromBinary
(
tag
,
varDataVal
(
val
),
varDataLen
(
val
),
TSDB_DATA_TYPE_BINARY
);
}
else
{
char
*
val
=
tsdbGetTableTagVal
(
pTable
,
tagColId
,
type
,
bytes
);
if
(
val
==
NULL
)
{
tag
->
nType
=
TSDB_DATA_TYPE_NULL
;
return
;
tVariantCreateFromBinary
(
tag
,
varDataVal
(
val
),
varDataLen
(
val
),
TSDB_DATA_TYPE_BINARY
);
}
else
{
char
*
val
=
tsdbGetTableTagVal
(
pTable
,
tagColId
,
type
,
bytes
);
if
(
val
==
NULL
)
{
tag
->
nType
=
TSDB_DATA_TYPE_NULL
;
return
;
}
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
...
...
@@ -3711,590 +3111,143 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultR
#endif
static
void
setupQueryRangeForReverseScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
numOfGroups
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
));
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pRuntimeEnv
,
i
);
SArray
*
tableKeyGroup
=
taosArrayGetP
(
pQuery
->
tableGroupInfo
.
pGroupList
,
i
);
size_t
t
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
t
;
++
j
)
{
STableQueryInfo
*
pCheckInfo
=
taosArrayGetP
(
group
,
j
);
updateTableQueryInfoForReverseScan
(
pQuery
,
pCheckInfo
);
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
// the start check timestamp of tsdbQueryHandle
STableKeyInfo
*
pTableKeyInfo
=
taosArrayGet
(
tableKeyGroup
,
j
);
pTableKeyInfo
->
lastKey
=
pCheckInfo
->
lastKey
;
assert
(
pCheckInfo
->
pTable
==
pTableKeyInfo
->
pTable
);
}
}
}
void
switchCtxOrder
(
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SWITCH_ORDER
(
pCtx
[
i
].
order
);
}
}
#if 0
void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t tid = 0;
int64_t uid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->pOutput = pQuery->sdata[i]->data;
/*
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/
SResultRowCellInfo* pCellInfo = getResultCell(pRuntimeEnv, pRow, i);
RESET_RESULT_INFO(pCellInfo);
pCtx->resultInfo = pCellInfo;
// set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput;
}
memset(pQuery->sdata[i]->data, 0, (size_t)(pQuery->pExpr1[i].bytes * pRuntimeEnv->resultInfo.capacity));
}
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
}
#endif
int32_t
initResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
pCellInfo
=
(
SResultRowCellInfo
*
)((
char
*
)
pResultRow
+
sizeof
(
SResultRow
));
pResultRow
->
pageId
=
-
1
;
pResultRow
->
offset
=
-
1
;
return
TSDB_CODE_SUCCESS
;
}
void
setDefaultOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pDataBlock
,
int32_t
*
rowCellInfoOffset
,
int64_t
uid
)
{
int32_t
tid
=
0
;
SResultRow
*
pRow
=
doPrepareResultRowFromKey
(
pRuntimeEnv
,
pResultRowInfo
,
(
char
*
)
&
tid
,
sizeof
(
tid
),
true
,
uid
);
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
/*
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/
SResultRowCellInfo
*
pCellInfo
=
getResultCell
(
pRow
,
i
,
rowCellInfoOffset
);
RESET_RESULT_INFO
(
pCellInfo
);
pCtx
[
i
].
resultInfo
=
pCellInfo
;
pCtx
[
i
].
pOutput
=
pData
->
pData
;
assert
(
pCtx
[
i
].
pOutput
!=
NULL
);
// set the timestamp output buffer for top/bottom/diff query
int32_t
functionId
=
pCtx
[
i
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pCtx
[
i
].
ptsOutputBuf
=
pCtx
[
0
].
pOutput
;
}
}
initCtxOutputBuf_rv
(
pCtx
,
pDataBlock
->
info
.
numOfCols
);
}
void
updateOutputBuf
(
SArithOperatorInfo
*
pInfo
,
int32_t
numOfInputRows
)
{
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
SSDataBlock
*
pDataBlock
=
pBInfo
->
pRes
;
int32_t
newSize
=
pDataBlock
->
info
.
rows
+
numOfInputRows
;
if
(
pInfo
->
bufCapacity
<
newSize
)
{
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
char
*
p
=
realloc
(
pColInfo
->
pData
,
newSize
*
pColInfo
->
info
.
bytes
);
if
(
p
!=
NULL
)
{
pColInfo
->
pData
=
p
;
// it starts from the tail of the previously generated results.
pBInfo
->
pCtx
[
i
].
pOutput
=
pColInfo
->
pData
;
pInfo
->
bufCapacity
=
newSize
;
}
else
{
// longjmp
}
}
}
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
pBInfo
->
pCtx
[
i
].
pOutput
=
pColInfo
->
pData
+
pColInfo
->
info
.
bytes
*
pDataBlock
->
info
.
rows
;
// re-estabilish output buffer pointer.
int32_t
functionId
=
pBInfo
->
pCtx
[
i
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pBInfo
->
pCtx
[
i
].
ptsOutputBuf
=
pBInfo
->
pCtx
[
0
].
pOutput
;
}
}
}
#if 0
void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// reset the execution contexts
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pExpr1[j].base.functionId;
assert(functionId != TSDB_FUNC_DIFF);
// set next output position
if (IS_OUTER_FORWARD(aAggs[functionId].status)) {
pRuntimeEnv->pCtx[j].pOutput += pRuntimeEnv->pCtx[j].outputBytes * output;
}
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
/*
* NOTE: for top/bottom query, the value of first column of output (timestamp) are assigned
* in the procedure of top/bottom routine
* the output buffer in top/bottom routine is ptsOutputBuf, so we need to forward the output buffer
*
* diff function is handled in multi-output function
*/
pRuntimeEnv->pCtx[j].ptsOutputBuf = (char*)pRuntimeEnv->pCtx[j].ptsOutputBuf + TSDB_KEYSIZE * output;
}
RESET_RESULT_INFO(pRuntimeEnv->pCtx[j].resultInfo);
}
}
void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
pCtx[j].currentStage = 0;
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (pResInfo->initialized) {
continue;
}
aAggs[pCtx[j].functionId].init(&pCtx[j]);
}
}
void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pRuntimeEnv->resultInfo.rows == 0 || pQuery->limit.offset == 0) {
return;
}
if (pRuntimeEnv->resultInfo.rows <= pQuery->limit.offset) {
qDebug("QInfo:%p skip rows:%" PRId64 ", new offset:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), pRuntimeEnv->resultInfo.rows,
pQuery->limit.offset - pRuntimeEnv->resultInfo.rows);
pQuery->limit.offset -= pRuntimeEnv->resultInfo.rows;
pRuntimeEnv->resultInfo.rows = 0;
resetDefaultResInfoOutputBuf(pRuntimeEnv);
// clear the buffer full flag if exists
CLEAR_QUERY_STATUS(pQuery, QUERY_RESBUF_FULL);
} else {
int64_t numOfSkip = pQuery->limit.offset;
pRuntimeEnv->resultInfo.rows -= numOfSkip;
pQuery->limit.offset = 0;
qDebug("QInfo:%p skip row:%"PRId64", new offset:%d, numOfRows remain:%" PRIu64, GET_QINFO_ADDR(pRuntimeEnv), numOfSkip,
0, pRuntimeEnv->resultInfo.rows);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
memmove(pQuery->sdata[i]->data, (char*)pQuery->sdata[i]->data + bytes * numOfSkip, (size_t)(pRuntimeEnv->resultInfo.rows * bytes));
pRuntimeEnv->pCtx[i].pOutput = ((char*) pQuery->sdata[i]->data) + pRuntimeEnv->resultInfo.rows * bytes;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput;
}
}
updateNumOfResult(pRuntimeEnv, (int32_t)pRuntimeEnv->resultInfo.rows);
}
}
void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = getResultRow(pWindowResInfo, i);
setResultOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j];
if (pCtx->functionId == TSDB_FUNC_TS) { // ignore more table
continue;
}
aAggs[pCtx->functionId].xNextStep(pCtx);
}
}
} else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SQLFunctionCtx* pCtx = &pRuntimeEnv->pCtx[j];
if (pCtx->functionId == TSDB_FUNC_TS) {
continue;
}
aAggs[pCtx->functionId].xNextStep(pCtx);
}
}
}
#endif
void
initCtxOutputBuf_rv
(
SQLFunctionCtx
*
pCtx
,
int32_t
size
)
{
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
j
]);
if
(
pResInfo
->
initialized
)
{
continue
;
}
aAggs
[
pCtx
[
j
].
functionId
].
init
(
&
pCtx
[
j
]);
}
}
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
)
{
if
(
status
==
QUERY_NOT_COMPLETED
)
{
pQuery
->
status
=
status
;
}
else
{
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
CLEAR_QUERY_STATUS
(
pQuery
,
QUERY_NOT_COMPLETED
);
pQuery
->
status
|=
status
;
}
}
#if 0
bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// for each group result, call the finalize function for each column
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = getResultRow(pWindowResInfo, i);
setResultOutputBuf(pRuntimeEnv, pResult);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pExpr1[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
}
aAggs[functId].xNextStep(&pRuntimeEnv->pCtx[j]);
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
toContinue |= (!pResInfo->complete);
}
}
} else {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int16_t functId = pQuery->pExpr1[j].base.functionId;
if (functId == TSDB_FUNC_TS) {
continue;
}
aAggs[functId].xNextStep(&pRuntimeEnv->pCtx[j]);
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[j]);
toContinue |= (!pResInfo->complete);
}
}
return toContinue;
}
static UNUSED_FUNC SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
assert((start <= pTableQueryInfo->lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(start >= pTableQueryInfo->lastKey && !QUERY_IS_ASC_QUERY(pQuery)));
SQueryStatusInfo info = {
.status = pQuery->status,
.windowIndex = pRuntimeEnv->resultRowInfo.curIndex,
.lastKey = start,
};
TIME_WINDOW_COPY(info.w, pQuery->window);
return info;
}
static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); // save the cursor
if (pRuntimeEnv->pTsBuf) {
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
assert(ret);
}
// reverse order time range
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWITCH_ORDER(pQuery->order.order);
if (QUERY_IS_ASC_QUERY(pQuery)) {
assert(pQuery->window.skey <= pQuery->window.ekey);
} else {
assert(pQuery->window.skey >= pQuery->window.ekey);
}
SET_REVERSE_SCAN_FLAG(pRuntimeEnv);
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
switchCtxOrder(pRuntimeEnv);
disableFuncInReverseScan(pRuntimeEnv);
setupQueryRangeForReverseScan(pRuntimeEnv);
// clean unused handle
if (pRuntimeEnv->pSecQueryHandle != NULL) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
}
void disableFuncInReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, SQLFunctionCtx* pCtx,
int32_t numOfOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
// group by normal columns and interval query on normal table
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) {
// disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order, numOfOutput);
} else { // for simple result of table query,
for (int32_t j = 0; j < numOfOutput; ++j) { // todo refactor
int32_t functId = pCtx[j].functionId;
if (pCtx[j].resultInfo == NULL) {
continue; // resultInfo is NULL, means no data checked in previous scan
}
if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) ||
((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) {
pCtx[j].resultInfo->complete = false;
} else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) {
pCtx[j].resultInfo->complete = true;
}
}
}
}
#endif
static
void
setEnvBeforeReverseScan_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
pRuntimeEnv
->
pTsBuf
)
{
SWITCH_ORDER
(
pRuntimeEnv
->
pTsBuf
->
cur
.
order
);
bool
ret
=
tsBufNextPos
(
pRuntimeEnv
->
pTsBuf
);
assert
(
ret
);
}
// reverse order time range
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
SWITCH_ORDER
(
pQuery
->
order
.
order
);
SET_REVERSE_SCAN_FLAG
(
pRuntimeEnv
);
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
switchCtxOrder
(
pCtx
,
numOfOutput
);
// disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput);
setupQueryRangeForReverseScan
(
pRuntimeEnv
);
}
#if 0
static UNUSED_FUNC void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SWITCH_ORDER(pQuery->order.order);
switchCtxOrder(pRuntimeEnv);
tsBufSetCursor(pRuntimeEnv->pTsBuf, &pStatus->cur);
if (pRuntimeEnv->pTsBuf) {
pRuntimeEnv->pTsBuf->cur.order = pQuery->order.order;
}
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
pTableQueryInfo->lastKey = pStatus->lastKey;
pQuery->status = pStatus->status;
pTableQueryInfo->win = pStatus->w;
pQuery->window = pTableQueryInfo->win;
}
static UNUSED_FUNC void restoreTimeWindow(STableGroupInfo* pTableGroupInfo, STsdbQueryCond* pCond) {
assert(pTableGroupInfo->numOfTables == 1);
SArray* pTableKeyGroup = taosArrayGetP(pTableGroupInfo->pGroupList, 0);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableKeyGroup, 0);
pKeyInfo->lastKey = pCond->twindow.skey;
}
static UNUSED_FUNC void handleInterpolationQuery(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->numOfCheckedBlocks > 0 || !isPointInterpoQuery(pQuery)) {
return;
}
SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_PREV_ROW);
SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQuery->memRef, TSDB_NEXT_ROW);
if (prev == NULL || next == NULL) {
return;
}
// setup the pCtx->start/end info and calculate the interpolation value
SColumnInfoData *startTs = taosArrayGet(prev, 0);
SColumnInfoData *endTs = taosArrayGet(next, 0);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
int32_t functionId = pQuery->pExpr1[i].base.functionId;
SColIndex *pColIndex = &pQuery->pExpr1[i].base.colInfo;
if (!TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
aAggs[functionId].xFunction(pCtx);
continue;
}
SColumnInfoData *p = taosArrayGet(prev, pColIndex->colIndex);
SColumnInfoData *n = taosArrayGet(next, pColIndex->colIndex);
int32_t
numOfGroups
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pRuntimeEnv
));
assert(p->info.colId == pColIndex->colId);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pRuntimeEnv
,
i
);
SArray
*
tableKeyGroup
=
taosArrayGetP
(
pQuery
->
tableGroupInfo
.
pGroupList
,
i
);
pCtx->start.key = *(TSKEY *)startTs->pData;
pCtx->end.key = *(TSKEY *)endTs->pData;
size_t
t
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
t
;
++
j
)
{
STableQueryInfo
*
pCheckInfo
=
taosArrayGetP
(
group
,
j
);
updateTableQueryInfoForReverseScan
(
pQuery
,
pCheckInfo
);
if (p->info.type != TSDB_DATA_TYPE_BINARY && p->info.type != TSDB_DATA_TYPE_NCHAR) {
GET_TYPED_DATA(pCtx->start.val, double, p->info.type, p->pData);
GET_TYPED_DATA(pCtx->end.val, double, n->info.type, n->pData);
} else { // string pointer
pCtx->start.ptr = p->pData;
pCtx->end.ptr = n->pData;
}
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
// the start check timestamp of tsdbQueryHandle
STableKeyInfo
*
pTableKeyInfo
=
taosArrayGet
(
tableKeyGroup
,
j
);
pTableKeyInfo
->
lastKey
=
pCheckInfo
->
lastKey
;
pCtx->param[2].i64 = (int8_t)pQuery->fillType;
pCtx->startTs = pQuery->window.skey;
if (pQuery->fillVal != NULL) {
if (isNull((const char *)&pQuery->fillVal[i], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
} else { // todo refactor, tVariantCreateFromBinary should handle the NULL value
if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType);
}
}
assert
(
pCheckInfo
->
pTable
==
pTableKeyInfo
->
pTable
);
}
}
}
aAggs[functionId].xFunction(pCtx);
void
switchCtxOrder
(
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SWITCH_ORDER
(
pCtx
[
i
].
order
);
}
}
int32_t
initResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
pCellInfo
=
(
SResultRowCellInfo
*
)((
char
*
)
pResultRow
+
sizeof
(
SResultRow
));
pResultRow
->
pageId
=
-
1
;
pResultRow
->
offset
=
-
1
;
return
TSDB_CODE_SUCCESS
;
}
void s
canOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery
;
S
TableQueryInfo *pTableQueryInfo = pQuery->current
;
void
s
etDefaultOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pDataBlock
,
int32_t
*
rowCellInfoOffset
,
int64_t
uid
)
{
int32_t
tid
=
0
;
S
ResultRow
*
pRow
=
doPrepareResultRowFromKey
(
pRuntimeEnv
,
pResultRowInfo
,
(
char
*
)
&
tid
,
sizeof
(
tid
),
true
,
uid
)
;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
// store the start query position
SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv, start);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
/*
* set the output buffer information and intermediate buffer
* not all queries require the interResultBuf, such as COUNT/TAGPRJ/PRJ/TAG etc.
*/
SResultRowCellInfo
*
pCellInfo
=
getResultCell
(
pRow
,
i
,
rowCellInfoOffset
);
RESET_RESULT_INFO
(
pCellInfo
);
pCtx
[
i
].
resultInfo
=
pCellInfo
;
pCtx
[
i
].
pOutput
=
pData
->
pData
;
assert
(
pCtx
[
i
].
pOutput
!=
NULL
);
if (!pQuery->groupbyColumn && pQuery->hasTagResults) {
setTagVal(pRuntimeEnv, pTableQueryInfo->pTable);
// set the timestamp output buffer for top/bottom/diff query
int32_t
functionId
=
pCtx
[
i
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pCtx
[
i
].
ptsOutputBuf
=
pCtx
[
0
].
pOutput
;
}
}
while (1) {
// doScanAllDataBlocks(pRuntimeEnv);
initCtxOutputBuf_rv
(
pCtx
,
pDataBlock
->
info
.
numOfCols
);
}
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
qstatus.status = pQuery->status;
void
updateOutputBuf
(
SArithOperatorInfo
*
pInfo
,
int32_t
numOfInputRows
)
{
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
SSDataBlock
*
pDataBlock
=
pBInfo
->
pRes
;
// do nothing if no data blocks are found qualified during scan
if (qstatus.lastKey == pTableQueryInfo->lastKey) {
qDebug("QInfo:%p no results generated in this scan", pQInfo);
}
}
int32_t
newSize
=
pDataBlock
->
info
.
rows
+
numOfInputRows
;
if
(
pInfo
->
bufCapacity
<
newSize
)
{
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
char
*
p
=
realloc
(
pColInfo
->
pData
,
newSize
*
pColInfo
->
info
.
bytes
);
if
(
p
!=
NULL
)
{
pColInfo
->
pData
=
p
;
if (!needRepeatScan(pRuntimeEnv)) {
// restore the status code and jump out of loop
if (pRuntimeEnv->scanFlag == REPEAT_SCAN) {
pQuery->status = qstatus.status;
// it starts from the tail of the previously generated results.
pBInfo
->
pCtx
[
i
].
pOutput
=
pColInfo
->
pData
;
pInfo
->
bufCapacity
=
newSize
;
}
else
{
// longjmp
}
break;
}
}
if (pRuntimeEnv->pSecQueryHandle != NULL
) {
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle
);
}
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
pBInfo
->
pCtx
[
i
].
pOutput
=
pColInfo
->
pData
+
pColInfo
->
info
.
bytes
*
pDataBlock
->
info
.
rows
;
STsdbQueryCond cond = createTsdbQueryCond(pQuery, &pQuery->window);
restoreTimeWindow(&pQuery->tableGroupInfo, &cond);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQuery->tsdb, &cond, &pQuery->tableGroupInfo, pQInfo, &pQuery->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
// re-estabilish output buffer pointer.
int32_t
functionId
=
pBInfo
->
pCtx
[
i
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pBInfo
->
pCtx
[
i
].
ptsOutputBuf
=
pBInfo
->
pCtx
[
0
].
pOutput
;
}
}
}
pRuntimeEnv->resultRowInfo.curIndex = qstatus.windowIndex;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
pRuntimeEnv->scanFlag = REPEAT_SCAN;
if (pRuntimeEnv->pTsBuf) {
bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
assert(ret);
void
initCtxOutputBuf_rv
(
SQLFunctionCtx
*
pCtx
,
int32_t
size
)
{
for
(
int32_t
j
=
0
;
j
<
size
;
++
j
)
{
SResultRowCellInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
j
]);
if
(
pResInfo
->
initialized
)
{
continue
;
}
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo,
cond.twindow.skey, cond.twindow.ekey);
aAggs
[
pCtx
[
j
].
functionId
].
init
(
&
pCtx
[
j
]);
}
}
if (needReverseScan(pQuery)) {
setEnvBeforeReverseScan(pRuntimeEnv, &qstatus);
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
)
{
if
(
status
==
QUERY_NOT_COMPLETED
)
{
pQuery
->
status
=
status
;
}
else
{
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
CLEAR_QUERY_STATUS
(
pQuery
,
QUERY_NOT_COMPLETED
);
pQuery
->
status
|=
status
;
}
}
// reverse scan from current position
qDebug("QInfo:%p start to reverse scan", pQInfo);
// doScanAllDataBlocks(pRuntimeEnv);
static
void
setEnvBeforeReverseScan_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
clearEnvAfterReverseScan(pRuntimeEnv, &qstatus);
if
(
pRuntimeEnv
->
pTsBuf
)
{
SWITCH_ORDER
(
pRuntimeEnv
->
pTsBuf
->
cur
.
order
);
bool
ret
=
tsBufNextPos
(
pRuntimeEnv
->
pTsBuf
);
assert
(
ret
);
}
handleInterpolationQuery(pQInfo);
// reverse order time range
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
SWITCH_ORDER
(
pQuery
->
order
.
order
);
SET_REVERSE_SCAN_FLAG
(
pRuntimeEnv
);
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
switchCtxOrder
(
pCtx
,
numOfOutput
);
// disableFuncInReverseScan(pRuntimeEnv, pResultRowInfo, pCtx, numOfOutput);
setupQueryRangeForReverseScan
(
pRuntimeEnv
);
}
#endif
void
finalizeQueryResult_rv
(
SOperatorInfo
*
pOperator
,
SQLFunctionCtx
*
pCtx
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
pOperator
->
pRuntimeEnv
;
...
...
@@ -4376,68 +3329,6 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
cleanupResultRowInfo
(
&
pTableQueryInfo
->
resInfo
);
}
#if 0
/**
* set output buffer for different group
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
void setExecutionContext(SQueryRuntimeEnv *pRuntimeEnv, int32_t groupIndex, TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
return;
}
int64_t uid = 0;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex,
sizeof(groupIndex), true, uid);
assert (pResultRow != NULL);
/*
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->numOfRowsPerPage) !=
TSDB_CODE_SUCCESS) {
return;
}
}
// record the current active group id
pRuntimeEnv->prevGroupId = groupIndex;
setResultOutputBuf(pRuntimeEnv, pResultRow);
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
}
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
pCtx->pOutput = getPosInResultPage(pRuntimeEnv, i, pResult, page);
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pRuntimeEnv->pCtx[0].pOutput;
}
/*
* set the output buffer information and intermediate buffer,
* not all queries require the interResultBuf, such as COUNT
*/
pCtx->resultInfo = getResultCell(pRuntimeEnv, pResult, i);
}
}
#endif
void
setResultRowOutputBufInitCtx
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResult
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
)
{
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
...
...
@@ -4465,7 +3356,6 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
}
}
void
setExecutionContext_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOptrBasicInfo
*
pInfo
,
int32_t
numOfOutput
,
int32_t
groupIndex
,
TSKEY
nextKey
)
{
STableQueryInfo
*
pTableQueryInfo
=
pRuntimeEnv
->
pQuery
->
current
;
...
...
@@ -4962,51 +3852,6 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
}
}
#if 0
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
while (1) {
int32_t ret = (int32_t)taosFillResultDataBlock(pFillInfo, (tFilePage**)pQuery->sdata, (int32_t)pRuntimeEnv->resultInfo.capacity);
// todo apply limit output function
/* reached the start position of according to offset value, return immediately */
if (pQuery->limit.offset == 0) {
qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret);
return ret;
}
if (pQuery->limit.offset < ret) {
qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, remain:%" PRId64 ", new offset:%d",
pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0);
ret -= (int32_t)pQuery->limit.offset;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { //???pExpr1 or pExpr2
memmove(pDst[i]->data, pDst[i]->data + pQuery->pExpr1[i].bytes * pQuery->limit.offset,
ret * pQuery->pExpr1[i].bytes);
}
pQuery->limit.offset = 0;
return ret;
} else {
qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, "
"remain:%d, new offset:%" PRId64, pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0,
pQuery->limit.offset - ret);
pQuery->limit.offset -= ret;
ret = 0;
}
// no data in current data after fill
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pRuntimeEnv->resultInfo.capacity);
if (numOfTotal == 0) {
return 0;
}
}
}
#endif
int32_t
doFillGapsInResults_rv
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SSDataBlock
*
pOutput
)
{
SFillInfo
*
pFillInfo
=
pRuntimeEnv
->
pFillInfo
;
...
...
@@ -6198,9 +5043,7 @@ static UNUSED_FUNC void sequentialTableProcess(SQInfo *pQInfo) {
}
}
#endif
#if 0
static int32_t doSaveContext(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
...
...
@@ -6265,88 +5108,6 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
// }
}
#if 0
static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
} else {
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
}
qDebug("QInfo:%p current:%"PRId64", total:%"PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total);
return;
}
qDebug("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo,
pQuery->window.skey, pQuery->window.ekey, pQuery->order.order);
// do check all qualified data blocks
int64_t el = scanMultiTableDataBlocks(pQInfo);
qDebug("QInfo:%p master scan completed, elapsed time: %" PRId64 "ms, reverse scan start", pQInfo, el);
// query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// close all time window results
doCloseAllTimeWindow(pQInfo);
if (needReverseScan(pQuery)) {
int32_t code = doSaveContext(pQInfo);
if (code == TSDB_CODE_SUCCESS) {
el = scanMultiTableDataBlocks(pQInfo);
qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el);
doRestoreContext(pQInfo);
} else {
pQInfo->code = code;
}
} else {
qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo);
}
setQueryStatus(pQuery, QUERY_COMPLETED);
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
//TODO finalizeQueryResult may cause SEGSEV, since the memory may not allocated yet, add a cleanup function instead
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
copyResToQueryResultBuf(pQInfo, pQuery);
} else { // not a interval query
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo);
}
// handle the limitation of output buffer
qDebug("QInfo:%p points returned:%" PRId64 ", total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total + pRuntimeEnv->resultInfo.rows);
}
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
SExprInfo* pExprInfo = (SExprInfo*) pSupport->exprList;
int32_t index = -1;
for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
if (colId == pExprInfo[i].base.resColId) {
index = i;
break;
}
}
assert(index >= 0 && index < pSupport->numOfCols);
return pSupport->data[index] + pSupport->offset * pExprInfo[index].bytes;
}
#endif
static
SSDataBlock
*
doTableScanImpl
(
STableScanInfo
*
pTableScanInfo
)
{
SSDataBlock
*
pBlock
=
&
pTableScanInfo
->
block
;
SQuery
*
pQuery
=
pTableScanInfo
->
pRuntimeEnv
->
pQuery
;
...
...
@@ -7422,23 +6183,6 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
return
pOperator
;
}
/*
* in each query, this function will be called only once, no retry for further result.
*
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
void
tableAggregationProcess
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
!
pQuery
->
topBotQuery
&&
pQuery
->
limit
.
offset
>
0
)
{
// no need to execute, since the output will be ignore.
return
;
}
pRuntimeEnv
->
outputBuf
=
pRuntimeEnv
->
proot
->
exec
(
pRuntimeEnv
->
proot
);
}
void
tableQueryImpl
(
SQInfo
*
pQInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
...
...
@@ -8662,18 +7406,6 @@ void freeQInfo(SQInfo *pQInfo) {
tfree
(
pQInfo
);
}
size_t
getResultSize
(
SQInfo
*
pQInfo
,
int64_t
*
numOfRows
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
/*
* get the file size and set the numOfRows to be the file size, since for tsComp query,
* the returned row size is equalled to 1
* TODO handle the case that the file is too large to send back one time
*/
return
(
size_t
)(
pQuery
->
resultRowSize
*
(
*
numOfRows
));
}
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
)
{
// the remained number of retrieved rows, not the interpolated result
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录