提交 61cb98c9 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 aaeedc65
......@@ -195,7 +195,6 @@ static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S
static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock);
static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo* pResultRowInfo,
......@@ -778,7 +777,7 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
}
}
static void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery, bool timeWindowInterpo) {
static UNUSED_FUNC void updateResultRowIndex(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo, bool ascQuery, bool timeWindowInterpo) {
if ((pTableQueryInfo->lastKey > pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey < pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo);
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
......@@ -973,7 +972,7 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in
// window start key interpolation
static UNUSED_FUNC void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock,
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock,
int32_t rowIndex) {
if (pDataBlock == NULL) {
return;
......@@ -1357,6 +1356,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doBlockwiseApplyFunctions_rv(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput);
}
if (pQuery->timeWindowInterpo) {
int32_t rowIndex = QUERY_IS_ASC_QUERY(pQuery)? pSDataBlock->info.rows-1:0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
}
}
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo,
......@@ -1436,48 +1440,6 @@ static int32_t setGroupResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResult
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock) {
SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
continue;
}
int16_t colIndex = -1;
int32_t colId = pColIndex->colId;
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].colId == colId) {
colIndex = i;
break;
}
}
assert(colIndex >= 0 && colIndex < pQuery->numOfCols);
*type = pQuery->colList[colIndex].type;
*bytes = pQuery->colList[colIndex].bytes;
/*
* the colIndex is acquired from the first tables of all qualified tables in this vnode during query prepare
* stage, the remain tables may not have the required column in cache actually. So, the validation of required
* column in cache with the corresponding schema is reinforced.
*/
int32_t numOfCols = (int32_t)taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
if (pColIndex->colId == p->info.colId) {
return p->pData;
}
}
}
return NULL;
}
static int32_t getGroupbyColumnData_rv(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock) {
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
......@@ -3630,25 +3592,6 @@ static void updateWindowResNumOfRes_rv(SQueryRuntimeEnv *pRuntimeEnv,
}
}
static UNUSED_FUNC void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis,
SArray *pDataBlock, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SResultRowInfo * pResultRowInfo = &pTableQueryInfo->resInfo;
// pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
// if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || pQuery->groupbyColumn) {
// rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, pDataBlock);
// } else {
// blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock);
// }
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
updateResultRowIndex(pResultRowInfo, pTableQueryInfo, QUERY_IS_ASC_QUERY(pQuery), pQuery->timeWindowInterpo);
}
}
bool hasNotReturnedResults(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册