提交 75984ed3 编写于 作者: H Haojun Liao

[td-2895] refactor

上级 51db5f57
......@@ -214,7 +214,7 @@ typedef struct SAggFunctionInfo {
void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
......@@ -247,7 +247,7 @@ extern struct SAggFunctionInfo aAggs[];
extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval);
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval);
/**
* the numOfRes should be kept, since it may be used later
......
......@@ -249,6 +249,7 @@ typedef struct SOperatorInfo {
int32_t numOfOutput;
__operator_fn_t exec;
__operator_fn_t cleanup;
struct SOperatorInfo *upstream;
} SOperatorInfo;
......@@ -292,11 +293,6 @@ typedef struct SQueryRuntimeEnv {
SGroupResInfo groupResInfo;
} SQueryRuntimeEnv;
typedef struct {
char* name;
void* info;
} SQEStage;
enum {
QUERY_RESULT_NOT_READY = 1,
QUERY_RESULT_READY = 2,
......@@ -345,11 +341,11 @@ typedef struct SQueryParam {
typedef struct STableScanInfo {
SQueryRuntimeEnv *pRuntimeEnv;
void *pQueryHandle;
int32_t numOfBlocks;
int32_t numOfSkipped;
int32_t numOfBlockStatis;
int64_t numOfRows;
int32_t order; // scan order
......@@ -359,11 +355,16 @@ typedef struct STableScanInfo {
int32_t reverseTimes; // 0 by default
SSDataBlock block;
SQLFunctionCtx* pCtx; // next operator query context
SResultRowInfo* pResultRowInfo;
int32_t numOfOutput;
int64_t elapsedTime;
} STableScanInfo;
typedef struct SAggOperatorInfo {
SResultRowInfo *pResultRowInfo;
SResultRowInfo resultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
......@@ -372,7 +373,9 @@ typedef struct SAggOperatorInfo {
typedef struct SArithOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx* pCtx;
SQLFunctionCtx *pCtx;
SResultRowInfo resultRowInfo;
SSDataBlock *pOutput;
} SArithOperatorInfo;
typedef struct SLimitOperatorInfo {
......@@ -388,10 +391,10 @@ typedef struct SOffsetOperatorInfo {
} SOffsetOperatorInfo;
typedef struct SHashIntervalOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
SResultRowInfo resultRowInfo;
} SHashIntervalOperatorInfo;
typedef struct SFillOperatorInfo {
......@@ -400,6 +403,12 @@ typedef struct SFillOperatorInfo {
SQueryRuntimeEnv *pRuntimeEnv;
} SFillOperatorInfo;
typedef struct SFilterOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
} SFilterOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
......
......@@ -457,7 +457,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) {
* @param filterCols
* @return
*/
int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
int32_t countRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return BLK_DATA_NO_NEEDED;
} else {
......@@ -465,7 +465,7 @@ int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32
}
}
int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
int32_t noDataRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_NO_NEEDED;
}
......@@ -667,16 +667,16 @@ static void sum_func_merge(SQLFunctionCtx *pCtx) {
}
}
static int32_t statisRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
static int32_t statisRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_STATIS_NEEDED;
}
static int32_t dataBlockRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
static int32_t dataBlockRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_ALL_NEEDED;
}
// todo: if column in current data block are null, opt for this case
static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
// todo: if column in current data block are null, opt for this case
static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED;
}
......@@ -689,7 +689,7 @@ static int32_t firstFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, i
}
}
static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order != pCtx->param[0].i64) {
return BLK_DATA_NO_NEEDED;
}
......@@ -701,7 +701,7 @@ static int32_t lastFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, in
}
}
static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order == TSDB_ORDER_DESC) {
return BLK_DATA_NO_NEEDED;
}
......@@ -717,11 +717,11 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY en
if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED;
} else { // data in current block is not earlier than current result
return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
return (pInfo->ts <= w->skey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
}
}
static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
if (pCtx->order != pCtx->param[0].i64) {
return BLK_DATA_NO_NEEDED;
}
......@@ -737,7 +737,7 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end
if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED;
} else {
return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
return (pInfo->ts > w->ekey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
}
}
......@@ -2412,7 +2412,7 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
}
}
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval) {
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo == NULL) {
return true;
......@@ -2427,7 +2427,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha
tValuePair **pRes = (tValuePair**) pTopBotInfo->res;
if (functionId == TSDB_FUNC_TOP) {
if (pCtx->functionId == TSDB_FUNC_TOP) {
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
return GET_INT8_VAL(maxval) > pRes[0]->v.i64;
......@@ -4549,7 +4549,7 @@ SAggFunctionInfo aAggs[] = {{
no_next_step,
doFinalizer,
count_func_merge,
count_load_data_info,
countRequired,
},
{
// 1
......@@ -4734,7 +4734,7 @@ SAggFunctionInfo aAggs[] = {{
no_next_step,
spread_function_finalizer,
spread_func_merge,
count_load_data_info,
countRequired,
},
{
// 14
......@@ -4776,7 +4776,7 @@ SAggFunctionInfo aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
no_data_info,
noDataRequired,
},
{
// 17
......@@ -4804,7 +4804,7 @@ SAggFunctionInfo aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
no_data_info,
noDataRequired,
},
{
// 19
......@@ -4832,7 +4832,7 @@ SAggFunctionInfo aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
no_data_info,
noDataRequired,
},
{
// 21, column project sql function
......@@ -4860,7 +4860,7 @@ SAggFunctionInfo aAggs[] = {{
no_next_step,
doFinalizer,
copy_function,
no_data_info,
noDataRequired,
},
{
// 23
......
......@@ -175,17 +175,20 @@ static STableIdInfo createTableIdInfo(SQuery* pQuery);
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(SQuery* pQuery);
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId);
static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv);
static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr);
static bool isFixedOutputQuery(SQuery* pQuery);
static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
//static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
......@@ -2316,13 +2319,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
// group by normal column, sliding window query, interval query are handled by interval query processor
if (!pQuery->stableQuery) { // interval (down sampling operation)
if (isFixedOutputQuery(pRuntimeEnv)) {
pRuntimeEnv->proot = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
if (isFixedOutputQuery(pQuery)) {
pRuntimeEnv->proot = createAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
} else if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
pRuntimeEnv->proot = createHashIntervalAggOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
......@@ -2457,8 +2467,7 @@ bool isQueryKilled(SQInfo *pQInfo) {
void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
static bool isFixedOutputQuery(SQueryRuntimeEnv* pRuntimeEnv) {
SQuery* pQuery = pRuntimeEnv->pQuery;
static bool isFixedOutputQuery(SQuery* pQuery) {
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
return false;
}
......@@ -2802,9 +2811,9 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx,
int32_t numOfRows) {
static bool doDataBlockStaticFilter(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) {
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pQuery->topBotQuery))) {
return true;
}
......@@ -2864,15 +2873,6 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
}
}
if (pQuery->topBotQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
return topbot_datablock_filter(&pCtx[i], functionId, (char *)&pDataStatis[i].min, (char *)&pDataStatis[i].max);
}
}
}
return false;
}
......@@ -2963,7 +2963,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId;
(*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
(*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], &pBlockInfo->window, colId);
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
break;
}
......@@ -2992,7 +2992,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis);
if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
if (!doDataBlockStaticFilter(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
// current block has been discard due to filter applied
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
......@@ -3011,11 +3011,12 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
return TSDB_CODE_SUCCESS;
}
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo * pWindowResInfo,
void* pQueryHandle, SSDataBlock* pBlock, uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
SQuery *pQuery = pRuntimeEnv->pQuery;
// int64_t groupId = pQuery->current->groupIndex;
int64_t groupId = pQuery->current->groupIndex;
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQueryCostInfo* pCost = &pQInfo->summary;
......@@ -3025,71 +3026,91 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *
} else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
// if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) {
if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info)) {
*status = BLK_DATA_ALL_NEEDED;
// }
}
if ((*status) != BLK_DATA_ALL_NEEDED) {
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
// if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
// SResultRow* pResult = NULL;
//
// bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
//
// TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
// STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
// if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) {
// // todo handle error in set result for timewindow
// }
// }
//
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base;
//
// int32_t functionId = pSqlFunc->functionId;
// int32_t colId = pSqlFunc->colInfo.colId;
// (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
// if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
// break;
// }
// }
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
SResultRow* pResult = NULL;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.skey:pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, &win, masterScan, &pResult, groupId) != TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base;
int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId;
(*status) |= aAggs[functionId].dataReqFunc(&pCtx[i], &pBlock->info.window, colId);
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
break;
}
}
}
}
SDataBlockInfo* pBlockInfo = &pBlock->info;
if ((*status) == BLK_DATA_NO_NEEDED) {
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->discardBlocks += 1;
} else if ((*status) == BLK_DATA_STATIS_NEEDED) {
// this function never returns error?
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis);
pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis);
if (*pStatis == NULL) { // data block statistics does not exist, load data block
*pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pCost->totalCheckedRows += pBlockInfo->rows;
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pCost->totalCheckedRows += pBlock->info.rows;
}
} else {
assert((*status) == BLK_DATA_ALL_NEEDED);
// load the data block statistics to perform further filter
pCost->loadBlockStatis += 1;
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis);
tsdbRetrieveDataBlockStatisInfo(pQueryHandle, &pBlock->pBlockStatis);
if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
// current block has been discard due to filter applied
if (pQuery->topBotQuery && pBlock->pBlockStatis != NULL) {
bool load = false;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
load = topbot_datablock_filter(&pCtx[i], (char *)&(pBlock->pBlockStatis[i].min), (char *)&(pBlock->pBlockStatis[i].max));
if (!load) {
// current block has been discard due to filter applied
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_DISCARD;
return TSDB_CODE_SUCCESS;
}
}
}
}
// current block has been discard due to filter applied
if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pCtx, pBlockInfo->rows)) {
pCost->discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = BLK_DATA_DISCARD;
return TSDB_CODE_SUCCESS;
}
pCost->totalCheckedRows += pBlockInfo->rows;
pCost->loadBlocks += 1;
*pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
if (*pDataBlock == NULL) {
pBlock->pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
if (pBlock->pDataBlock == NULL) {
return terrno;
}
}
......@@ -3186,7 +3207,7 @@ static void expandBuffer(SQueryRuntimeEnv* pRuntimeEnv, int32_t newSize, void* q
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery;
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pQuery->groupbyColumn && !isFixedOutputQuery(pRuntimeEnv) && !isTsCompQuery(pQuery)) {
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pQuery->groupbyColumn && !isFixedOutputQuery(pQuery) && !isTsCompQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec;
int32_t remain = (int32_t)(pRec->capacity - pRec->rows);
......@@ -3684,10 +3705,10 @@ void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
initCtxOutputBuf(pRuntimeEnv, pRuntimeEnv->pCtx);
}
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SSDataBlock* pDataBlock) {
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, SSDataBlock* pDataBlock) {
int32_t tid = 0;
int64_t uid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->resultRowInfo, (char *)&tid, sizeof(tid), true, uid);
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
......@@ -4417,7 +4438,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
* In ascending query, the key is the first qualified timestamp. However, in the descending order query, additional
* operations involve.
*/
STimeWindow w = TSWINDOW_INITIALIZER;
STimeWindow w = TSWINDOW_INITIALIZER;
TSKEY sk = MIN(win.skey, win.ekey);
TSKEY ek = MAX(win.skey, win.ekey);
......@@ -5115,7 +5136,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
return TSDB_CODE_SUCCESS;
}
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pRuntimeEnv))) {
if (isSTableQuery && (!QUERY_IS_INTERVAL_QUERY(pQuery)) && (!isFixedOutputQuery(pQuery))) {
return TSDB_CODE_SUCCESS;
}
......@@ -5126,7 +5147,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
&& (cond.order == TSDB_ORDER_ASC)
&& (!QUERY_IS_INTERVAL_QUERY(pQuery))
&& (!isGroupbyColumn(pQuery->pGroupbyExpr))
&& (!isFixedOutputQuery(pRuntimeEnv))
&& (!isFixedOutputQuery(pQuery))
) {
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
......@@ -6182,8 +6203,9 @@ static SSDataBlock* doScanTableImpl(STableScanInfo *pTableScanInfo) {
// this function never returns error?
uint32_t status;
int32_t code = loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, NULL, pTableScanInfo->pQueryHandle, &pBlock->info, &pBlock->pBlockStatis,
&pBlock->pDataBlock, &status);
int32_t code =
loadDataBlockOnDemand_rv(pTableScanInfo->pRuntimeEnv, pTableScanInfo->pCtx, pTableScanInfo->pResultRowInfo,
pTableScanInfo->pQueryHandle, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTableScanInfo->pRuntimeEnv->env, code);
}
......@@ -6260,7 +6282,7 @@ static SSDataBlock* doTableScan(void* param) {
return NULL;
}
static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -6272,15 +6294,34 @@ static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle
pInfo->current = 0;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "SeqScanTableOp";
pOptr->blockingOptr = false;
pOptr->optInfo = pInfo;
pOptr->completed = false;
pOptr->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOptr->exec = doTableScan;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqScanTableOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols;
pOperator->exec = doTableScan;
return pOptr;
return pOperator;
}
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
assert(pTableScanInfo != NULL && pDownstream != NULL);
char* name = pDownstream->name;
if (strcasecmp(name, "AggregationOp") == 0) {
SAggOperatorInfo* pAggInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pAggInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo;
} else if (strcasecmp(name, "HashIntervalAggOp") == 0){
SHashIntervalOperatorInfo* pIntervalInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pIntervalInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo;
} else {
assert(0);
}
}
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
......@@ -6324,13 +6365,13 @@ static SSDataBlock* doAggregation(void* param) {
SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, order, pQuery->vgId);
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
setDefaultOutputBuf(pRuntimeEnv, pCtx, pRes);
setDefaultOutputBuf(pRuntimeEnv, pCtx, &pRuntimeEnv->resultRowInfo, pRes);
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
......@@ -6368,14 +6409,17 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SQuery* pQuery = pRuntimeEnv->pQuery;
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
if (pArithInfo->pCtx == NULL) {
pArithInfo->pOutput = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
initResultRowInfo(&pArithInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
}
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, pRes);
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, &pArithInfo->resultRowInfo, pArithInfo->pOutput);
pRuntimeEnv->pQuery->pos = 0;
pArithInfo->pOutput->info.rows = 0;
while(1) {
SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
......@@ -6387,7 +6431,7 @@ static SSDataBlock* doArithmeticOperation(void* param) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pArithInfo->pCtx[i].size = pBlock->info.rows;
if (pArithInfo->pCtx[i].functionId == TSDB_FUNC_ARITHM) {
setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, pOperator->pExpr, pBlock);
setArithParams((SArithmeticSupport*) pArithInfo->pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock);
} else {
SColIndex *pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
......@@ -6405,13 +6449,13 @@ static SSDataBlock* doArithmeticOperation(void* param) {
}
arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->pCtx, pOperator->pExpr, pOperator->numOfOutput);
pRes->info.rows += pBlock->info.rows;
if (pRes->info.rows > 4096) {
pArithInfo->pOutput->info.rows += pBlock->info.rows;
if (pArithInfo->pOutput->info.rows > 4096) {
break;
}
}
return pRes;
return pArithInfo->pOutput;
}
static SSDataBlock* doLimit(void* param) {
......@@ -6483,11 +6527,10 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
......@@ -6503,8 +6546,8 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pCtx, pBlock, order);
hashIntervalAgg(pRuntimeEnv, pOperator, pCtx, pBlock);
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, order);
hashIntervalAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock);
}
pOperator->completed = true;
......@@ -6513,8 +6556,6 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
destroySQLFunctionCtx(pCtx, pOperator->numOfOutput);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRes);
......@@ -6559,6 +6600,10 @@ static SSDataBlock* doFill(void* param) {
return NULL;
}
//SSDataBlock* doFilter(void* param) {
//
//}
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(SQuery* pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
......@@ -6576,33 +6621,41 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
return;
}
if (pOperator->cleanup != NULL) {
pOperator->cleanup(pOperator->optInfo);
}
destroyOperatorInfo(pOperator->upstream);
tfree(pOperator->optInfo);
tfree(pOperator);
}
static SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
static SOperatorInfo* createAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pResultRowInfo = pResultRowInfo;
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pRuntimeEnv = pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AggregationOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->upstream = inputOptr;
pOperator->upstream = upstream;
pOperator->exec = doAggregation;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
return pOperator;
}
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
pInfo->pTableQueryInfo = pTableQueryInfo;
......@@ -6613,7 +6666,7 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo,
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->upstream = inputOptr;
pOperator->upstream = upstream;
pOperator->exec = doArithmeticOperation;
pOperator->pExpr = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->pExpr1:pRuntimeEnv->pQuery->pExpr2;
pOperator->numOfOutput = (pRuntimeEnv->pQuery->pExpr2 == NULL)? pRuntimeEnv->pQuery->numOfOutput:pRuntimeEnv->pQuery->numOfExpr2;
......@@ -6621,7 +6674,7 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo,
return pOperator;
}
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQuery->limit.limit;
......@@ -6632,7 +6685,7 @@ static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo,
pOperator->name = "LimitOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->upstream = upstream;
pOperator->exec = doLimit;
pOperator->pExpr = NULL;
pOperator->numOfOutput = 0;
......@@ -6641,7 +6694,7 @@ static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo,
return pOperator;
}
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOffsetOperatorInfo* pInfo = calloc(1, sizeof(SOffsetOperatorInfo));
pInfo->offset = pRuntimeEnv->pQuery->limit.offset;
......@@ -6653,7 +6706,7 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo,
pOperator->name = "OffsetOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->upstream = upstream;
pOperator->exec = doOffset;
pOperator->pExpr = NULL;
pOperator->numOfOutput = 0;
......@@ -6662,19 +6715,43 @@ static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo,
return pOperator;
}
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HashIntervalAggOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->upstream = upstream;
pOperator->exec = doHashIntervalAgg;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
return pOperator;
}
UNUSED_FUNC SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "FilterOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = NULL;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->optInfo = pInfo;
......@@ -6682,7 +6759,7 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ
return pOperator;
}
static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
......@@ -6693,7 +6770,7 @@ static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTable
pOperator->name = "FillOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->upstream = upstream;
pOperator->exec = doFill;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
......@@ -6832,6 +6909,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows;
#if 0
// scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
// finalizeQueryResult(pRuntimeEnv);
......@@ -6911,7 +6989,7 @@ void tableQueryImpl(SQInfo *pQInfo) {
// group by normal column, sliding window query, interval query are handled by interval query processor
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->groupbyColumn) { // interval (down sampling operation)
tableIntervalProcess(pQInfo, item);
} else if (isFixedOutputQuery(pRuntimeEnv)) {
} else if (isFixedOutputQuery(pQuery)) {
tableAggregationProcess(pQInfo, item);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
......@@ -6984,7 +7062,7 @@ void stableQueryImpl(SQInfo *pQInfo) {
int64_t st = taosGetTimestampUs();
if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) {
(isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && (!pQuery->groupbyColumn))) {
multiTableQueryProcess(pQInfo);
} else {
assert(pQuery->checkResultBuf == 1 || isPointInterpoQuery(pQuery) || pQuery->groupbyColumn);
......@@ -7553,7 +7631,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
for (int32_t i = 0; i < numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i];
......@@ -7573,49 +7651,12 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypes[type].bytes;
// } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column
// SSchema* s = tGetTbnameColumnSchema();
// type = s->type;
// bytes = s->bytes;
// } else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
// SSchema s = tGetBlockDistColumnSchema();
// type = s.type;
// bytes = s.bytes;
// } else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) {
// // it is a user-defined constant value column
// assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
//
// type = pExprs[i].base.arg[1].argType;
// bytes = pExprs[i].base.arg[1].argBytes;
//
// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
// bytes += VARSTR_HEADER_SIZE;
// }
} else {
int32_t index = pExprs[i].base.colInfo.colIndex;
assert(prevExpr[index].base.resColId == pExprs[i].base.colInfo.colId);
// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
// if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
// if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
// return TSDB_CODE_QRY_INVALID_MSG;
// }
// } else {
// if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pQueryMsg->numOfCols) {
// return TSDB_CODE_QRY_INVALID_MSG;
// }
// }
//
// if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) {
// SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
// type = pCol->type;
// bytes = pCol->bytes;
// } else {
// SSchema* s = tGetTbnameColumnSchema();
type = prevExpr[index].type;
bytes = prevExpr[index].bytes;
// }
type = prevExpr[index].type;
bytes = prevExpr[index].bytes;
}
int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册