提交 1444f0b4 编写于 作者: H Haojun Liao

[td-11818] refactor

上级 f2d61c56
......@@ -20,16 +20,16 @@
extern "C" {
#endif
typedef void* qinfo_t;
typedef void* qTaskInfo_t;
/**
* create the qinfo object according to QueryTableMsg
* @param tsdb
* @param pQueryTableMsg
* @param qinfo
* @param pTaskInfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId);
int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryTableMsg, qTaskInfo_t* pTaskInfo, uint64_t qId);
/**
* the main query execution function, including query on both table and multiple tables,
......@@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableM
* @param qinfo
* @return
*/
bool qTableQuery(qinfo_t qinfo, uint64_t *qId);
bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId);
/**
* Retrieve the produced results information, if current query is not paused or completed,
......@@ -48,7 +48,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId);
* @param qinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext);
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext);
/**
*
......@@ -60,41 +60,41 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
* @param contLen payload length
* @return
*/
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
* return the transporter context (RPC)
* @param qinfo
* @return
*/
void* qGetResultRetrieveMsg(qinfo_t qinfo);
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
/**
* kill the ongoing query and free the query handle and corresponding resources automatically
* @param qinfo qhandle
* @return
*/
int32_t qKillQuery(qinfo_t qinfo);
int32_t qKillTask(qTaskInfo_t qinfo);
/**
* return whether query is completed or not
* @param qinfo
* @return
*/
int32_t qIsQueryCompleted(qinfo_t qinfo);
int32_t qIsQueryCompleted(qTaskInfo_t qinfo);
/**
* destroy query info structure
* @param qHandle
*/
void qDestroyQueryInfo(qinfo_t qHandle);
void qDestroyTask(qTaskInfo_t qHandle);
/**
* Get the queried table uid
* @param qHandle
* @return
*/
int64_t qGetQueriedTableUid(qinfo_t qHandle);
int64_t qGetQueriedTableUid(qTaskInfo_t qHandle);
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
......@@ -121,7 +121,7 @@ int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGro
* @param type operation type: ADD|DROP
* @return
*/
int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type);
int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type);
//================================================================================================
// query handle management
......@@ -130,13 +130,13 @@ int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type);
* @param vgId
* @return
*/
void* qOpenQueryMgmt(int32_t vgId);
void* qOpenTaskMgmt(int32_t vgId);
/**
* broadcast the close information and wait for all query stop.
* @param pExecutor
*/
void qQueryMgmtNotifyClosed(void* pExecutor);
void qTaskMgmtNotifyClosing(void* pExecutor);
/**
* Re-open the query handle management module when opening the vnode again.
......@@ -148,7 +148,7 @@ void qQueryMgmtReOpen(void *pExecutor);
* Close query mgmt and clean up resources.
* @param pExecutor
*/
void qCleanupQueryMgmt(void* pExecutor);
void qCleanupTaskMgmt(void* pExecutor);
/**
* Add the query into the query mgmt object
......@@ -157,7 +157,7 @@ void qCleanupQueryMgmt(void* pExecutor);
* @param qInfo
* @return
*/
void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo);
/**
* acquire the query handle according to the key from query mgmt object.
......@@ -165,7 +165,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
* @param key
* @return
*/
void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qAcquireTask(void* pMgmt, uint64_t key);
/**
* release the query handle and decrease the reference count in cache
......@@ -174,7 +174,7 @@ void** qAcquireQInfo(void* pMgmt, uint64_t key);
* @param freeHandle
* @return
*/
void** qReleaseQInfo(void* pMgmt, void* pQInfo);
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
/**
* De-register the query handle from the management module and free it immediately.
......
......@@ -183,7 +183,6 @@ typedef struct tExprNode {
struct {// function node
char functionName[FUNCTIONS_NAME_MAX_LENGTH];
// int32_t functionId;
int32_t num;
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
......
......@@ -8,5 +8,5 @@ target_include_directories(
target_link_libraries(
executor
PRIVATE os util common function parser
PRIVATE os util common function parser qcom
)
\ No newline at end of file
......@@ -88,37 +88,37 @@ typedef struct SResultRowPool {
SArray* pData; // SArray<void*>
} SResultRowPool;
struct SQueryAttr;
struct SQueryRuntimeEnv;
struct STaskAttr;
struct STaskRuntimeEnv;
struct SUdfInfo;
int32_t getOutputInterResultBufSize(struct SQueryAttr* pQueryAttr);
int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
size_t getResultRowSize(struct SQueryRuntimeEnv* pRuntimeEnv);
size_t getResultRowSize(struct STaskRuntimeEnv* pRuntimeEnv);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void resetResultRowInfo(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo);
int32_t initResultRow(SResultRow *pResultRow);
void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
int32_t getRowNumForMultioutput(struct SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable);
int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot];
}
static FORCE_INLINE char* getPosInResultPage(struct SQueryAttr* pQueryAttr, SFilePage* page, int32_t rowOffset,
static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFilePage* page, int32_t rowOffset,
int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL);
......@@ -155,7 +155,7 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
......
......@@ -21,7 +21,6 @@
#include "tvariant.h"
#include "thash.h"
//#include "parser.h"
#include "executil.h"
#include "taosdef.h"
#include "tarray.h"
......@@ -166,7 +165,7 @@ typedef struct {
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef struct SQueryAttr {
typedef struct STaskAttr {
SLimit limit;
SLimit slimit;
......@@ -229,16 +228,16 @@ typedef struct SQueryAttr {
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
SArray *pUdfInfo; // no need to free
} SQueryAttr;
} STaskAttr;
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo;
typedef struct SQueryRuntimeEnv {
typedef struct STaskRuntimeEnv {
jmp_buf env;
SQueryAttr* pQueryAttr;
STaskAttr* pQueryAttr;
uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
......@@ -271,7 +270,7 @@ typedef struct SQueryRuntimeEnv {
SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap;
struct SUdfInfo *pUdfInfo;
} SQueryRuntimeEnv;
} STaskRuntimeEnv;
enum {
OP_IN_EXECUTING = 1,
......@@ -287,9 +286,9 @@ typedef struct SOperatorInfo {
char *name; // name, used to show the query execution plan
void *info; // extension attribution
SExprInfo *pExpr;
SQueryRuntimeEnv *pRuntimeEnv;
STaskRuntimeEnv *pRuntimeEnv;
struct SOperatorInfo **upstream; // upstream pointer list
struct SOperatorInfo ** pDownstream; // upstream pointer list
int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator
__operator_fn_t exec;
__optr_cleanup_fn_t cleanup;
......@@ -312,8 +311,8 @@ typedef struct SQInfo {
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
SQueryRuntimeEnv runtimeEnv;
SQueryAttr query;
STaskRuntimeEnv runtimeEnv;
STaskAttr query;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads
......@@ -325,7 +324,7 @@ typedef struct SQInfo {
SQueryCostInfo summary;
} SQInfo;
typedef struct SQueryParam {
typedef struct STaskParam {
char *sql;
char *tagCond;
char *colCond;
......@@ -345,7 +344,7 @@ typedef struct SQueryParam {
int32_t tableScanOperator;
SArray *pOperator;
struct SUdfInfo *pUdfInfo;
} SQueryParam;
} STaskParam;
typedef struct STableScanInfo {
void *pQueryHandle;
......@@ -512,34 +511,34 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......@@ -561,8 +560,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
void freeParam(STaskParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
......@@ -575,13 +574,13 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger);
int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId);
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf);
STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
......@@ -590,9 +589,9 @@ bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo *pQInfo);
bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(SQueryAttr* pQueryAttr);
bool onlyQueryTags(STaskAttr* pQueryAttr);
void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool isValidQInfo(void *param);
......@@ -607,8 +606,8 @@ void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery);
void doDestroyTask(SQInfo *pQInfo);
void freeQueryAttr(STaskAttr *pQuery);
int32_t getMaximumIdleDurationSec();
......
......@@ -30,7 +30,7 @@ typedef struct SCompSupporter {
int32_t order;
} SCompSupporter;
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) {
int32_t getRowNumForMultioutput(STaskAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) {
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) {
......@@ -42,7 +42,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
return 1;
}
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) {
int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t size = 0;
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
......@@ -86,7 +86,7 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
tfree(pResultRowInfo->pResult);
}
void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) {
return;
}
......@@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow(pResultRowInfo, slot)->closed = true;
}
void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) {
void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) {
if (pResultRow == NULL) {
return;
}
......@@ -174,8 +174,8 @@ struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index,
return NULL;
}
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
size_t getResultRowSize(STaskRuntimeEnv* pRuntimeEnv) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
return 0;
// return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
}
......@@ -393,8 +393,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
}
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
static int64_t getNumOfResultWindowRes(STaskRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) {
int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId;
......@@ -488,7 +488,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
}
}
void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
__compar_fn_t fn = NULL;
if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder;
......@@ -499,7 +499,7 @@ void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
taosArraySort(pRuntimeEnv->pResultRowArrayList, fn);
}
static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
if (!pGroupResInfo->ordered) {
orderTheResultRows(pRuntimeEnv);
pGroupResInfo->ordered = true;
......@@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR
return TSDB_CODE_SUCCESS;
}
static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr);
......@@ -630,7 +630,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn
return code;
}
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
......
......@@ -12,13 +12,13 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <function.h>
#include "os.h"
#include "tmsg.h"
#include "tglobal.h"
#include "ttime.h"
#include "exception.h"
#include "function.h"
#include "executorimpl.h"
#include "thash.h"
#include "function.h"
......@@ -41,11 +41,6 @@
#define MULTI_KEY_DELIM "-"
#define TIME_WINDOW_COPY(_dst, _src) do {\
(_dst).skey = (_src).skey;\
(_dst).ekey = (_src).ekey;\
} while (0)
enum {
TS_JOIN_TS_EQUAL = 0,
TS_JOIN_TS_NOT_EQUALS = 1,
......@@ -131,40 +126,16 @@ do { \
} \
} while (0)
uint64_t queryHandleId = 0;
int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2;
}
int64_t genQueryId(void) {
int64_t uid = 0;
int64_t did = 0;//tsDnodeId;
uid = did << 54;
int64_t pid = ((int64_t)taosGetPId()) & 0x3FF;
uid |= pid << 44;
int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF;
uid |= ts << 11;
int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF;
uid |= sid;
// //qDebug("gen qid:0x%"PRIx64, uid);
return uid;
}
static int32_t getExprFunctionId(SExprInfo *pExprInfo) {
assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
return 0;
}
static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) {
static void getNextTimeWindow(STaskAttr* pQueryAttr, STimeWindow* tw) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
if (pQueryAttr->interval.intervalUnit != 'n' && pQueryAttr->interval.intervalUnit != 'y') {
tw->skey += pQueryAttr->interval.sliding * factor;
......@@ -198,28 +169,28 @@ static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) {
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SQLFunctionCtx* pCtx,
static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx);
void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static bool hasMainOutput(SQueryAttr *pQueryAttr);
static bool hasMainOutput(STaskAttr *pQueryAttr);
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo);
static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order);
//static STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win);
//static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr);
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
......@@ -239,25 +210,25 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
}
}
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
static void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId);
SArray* getOrderCheckColumns(SQueryAttr* pQuery);
SArray* getOrderCheckColumns(STaskAttr* pQuery);
typedef struct SRowCompSupporter {
SQueryRuntimeEnv *pRuntimeEnv;
STaskRuntimeEnv *pRuntimeEnv;
int16_t dataOffset;
__compar_fn_t comFunc;
} SRowCompSupporter;
......@@ -267,7 +238,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) {
const SResultRow *pRow2 = (const SResultRow *)b;
SRowCompSupporter *supporter = (SRowCompSupporter *)userData;
SQueryRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv;
SFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId);
SFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId);
......@@ -279,7 +250,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) {
return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
}
static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) {
static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) {
SArray *columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
size_t size = taosArrayGetSize(columnOrderList);
taosArrayDestroy(columnOrderList);
......@@ -375,7 +346,7 @@ static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput
// return (numOfSelectivity > 0 && hasTags);
}
static bool isProjQuery(SQueryAttr *pQueryAttr) {
static bool isProjQuery(STaskAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functId != FUNCTION_PRJ && functId != FUNCTION_TAGPRJ) {
......@@ -398,7 +369,7 @@ static bool hasNull(SColIndex* pColIndex, SColumnDataAgg *pStatis) {
return true;
}
static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntimeEnv* pRuntimeEnv) {
static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, STaskRuntimeEnv* pRuntimeEnv) {
// more than the capacity, reallocate the resources
if (pResultRowInfo->size < pResultRowInfo->capacity) {
return;
......@@ -424,7 +395,7 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
pResultRowInfo->capacity = (int32_t)newCapacity;
}
static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
static bool chkResultRowFromKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
......@@ -462,7 +433,7 @@ static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *p
}
static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
......@@ -536,7 +507,7 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult
return pResultRowInfo->pResult[pResultRowInfo->curPos];
}
static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) {
static void getInitialStartTimeWindow(STaskAttr* pQueryAttr, TSKEY ts, STimeWindow* w) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
getAlignQueryTimeWindow(pQueryAttr, ts, ts, pQueryAttr->window.ekey, w);
} else {
......@@ -561,7 +532,7 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin
}
// get the correct time window according to the handled timestamp
static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, STaskAttr *pQueryAttr) {
STimeWindow w = {0};
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
......@@ -609,7 +580,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
}
// get the correct time window according to the handled timestamp
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, STaskAttr *pQueryAttr) {
STimeWindow w = {0};
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
......@@ -680,14 +651,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0;
}
static bool chkWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
static bool chkWindowOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey);
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
}
static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
static int32_t setResultOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey);
......@@ -816,7 +787,7 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
}
}
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) {
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, STaskAttr* pQueryAttr, TSKEY lastKey) {
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) {
closeAllResultRows(pResultRowInfo);
......@@ -827,10 +798,10 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer
}
}
static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
static int32_t getNumOfRowsInTimeWindow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STableQueryInfo* item = pRuntimeEnv->current;
int32_t num = -1;
......@@ -867,9 +838,9 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
return num;
}
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
bool hasAggregates = pCtx[0].isAggSet;
for (int32_t k = 0; k < numOfOutput; ++k) {
......@@ -904,7 +875,7 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
}
static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
static int32_t getNextQualifiedWindow(STaskAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) {
getNextTimeWindow(pQueryAttr, pNext);
......@@ -983,7 +954,7 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow* pNext
return startPos;
}
static FORCE_INLINE TSKEY reviseWindowEkey(SQueryAttr *pQueryAttr, STimeWindow *pWindow) {
static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr *pQueryAttr, STimeWindow *pWindow) {
TSKEY ekey = -1;
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
ekey = pWindow->ekey;
......@@ -1012,20 +983,20 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in
}
}
static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock,
static void saveDataBlockLastRow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock,
int32_t rowIndex) {
if (pDataBlock == NULL) {
return;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t k = 0; k < pQueryAttr->numOfCols; ++k) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k);
memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes);
}
}
static TSKEY getStartTsKey(SQueryAttr* pQueryAttr, STimeWindow* win, const TSKEY* tsCols, int32_t rows) {
static TSKEY getStartTsKey(STaskAttr* pQueryAttr, STimeWindow* win, const TSKEY* tsCols, int32_t rows) {
TSKEY ts = TSKEY_INITIAL_VAL;
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
......@@ -1126,7 +1097,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
}
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) {
......@@ -1136,8 +1107,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
}
}
static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
static void projectApplyFunctions(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pQueryAttr->window.skey;
......@@ -1161,7 +1132,7 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SExprInfo* pExpr = pOperator->pExpr;
SQLFunctionCtx* pCtx = pInfo->pCtx;
......@@ -1226,8 +1197,8 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, int32_t pos,
int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, STimeWindow* win) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
......@@ -1257,8 +1228,8 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLF
static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx,
int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) {
SQueryRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
TSKEY actualEndKey = tsCols[endRowIndex];
......@@ -1289,8 +1260,8 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFun
static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SQLFunctionCtx* pCtx,
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
if (!pQueryAttr->timeWindowInterpo) {
return;
}
......@@ -1340,9 +1311,9 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
......@@ -1450,9 +1421,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
......@@ -1525,12 +1496,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
......@@ -1607,7 +1578,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
}
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
// primary timestamp column
......@@ -1692,7 +1663,7 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
}
}
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
......@@ -1746,9 +1717,9 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD
return -1;
}
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) {
static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) {
struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
// in case of timestamp column, always generated results.
int32_t functionId = pCtx->functionId;
......@@ -1843,9 +1814,9 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) {
return TSDB_CODE_SUCCESS;
}
static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
static SQLFunctionCtx* createSQLFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t** rowCellInfoOffset) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx));
if (pFuncCtx == NULL) {
......@@ -1965,9 +1936,9 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator, void* merger) {
static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator, void* merger) {
//qDebug("QInfo:0x%"PRIx64" setup runtime env", GET_QID(pRuntimeEnv));
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pQueryAttr = pQueryAttr;
......@@ -2187,8 +2158,8 @@ _clean:
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
// tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
......@@ -2197,7 +2168,7 @@ static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) {
// assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL);
}
static void destroyTsComp(SQueryRuntimeEnv *pRuntimeEnv, SQueryAttr *pQueryAttr) {
static void destroyTsComp(STaskRuntimeEnv *pRuntimeEnv, STaskAttr *pQueryAttr) {
if (pQueryAttr->tsCompQuery && pRuntimeEnv->outputBuf && pRuntimeEnv->outputBuf->pDataBlock && taosArrayGetSize(pRuntimeEnv->outputBuf->pDataBlock) > 0) {
SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0);
if (pColInfoData) {
......@@ -2210,8 +2181,8 @@ static void destroyTsComp(SQueryRuntimeEnv *pRuntimeEnv, SQueryAttr *pQueryAttr)
}
}
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo;
//qDebug("QInfo:0x%"PRIx64" teardown runtime env", pQInfo->qId);
......@@ -2271,7 +2242,7 @@ bool isQueryKilled(SQInfo *pQInfo) {
void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;}
//static bool isFixedOutputQuery(SQueryAttr* pQueryAttr) {
//static bool isFixedOutputQuery(STaskAttr* pQueryAttr) {
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
// return false;
// }
......@@ -2297,7 +2268,7 @@ void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELL
//}
// todo refactor with isLastRowQuery
//bool isPointInterpoQuery(SQueryAttr *pQueryAttr) {
//bool isPointInterpoQuery(STaskAttr *pQueryAttr) {
// for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
// int32_t functionId = pQueryAttr->pExpr1[i].base.functionId;
// if (functionId == FUNCTION_INTERP) {
......@@ -2308,7 +2279,7 @@ void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELL
// return false;
//}
static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) {
static bool isFirstLastRowQuery(STaskAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionID = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionID == FUNCTION_LAST_ROW) {
......@@ -2319,7 +2290,7 @@ static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) {
return false;
}
static bool isCachedLastQuery(SQueryAttr *pQueryAttr) {
static bool isCachedLastQuery(STaskAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) {
......@@ -2354,7 +2325,7 @@ static bool isCachedLastQuery(SQueryAttr *pQueryAttr) {
* The following 4 kinds of query are treated as the tags query
* tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query
*/
bool onlyQueryTags(SQueryAttr* pQueryAttr) {
bool onlyQueryTags(STaskAttr* pQueryAttr) {
for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
SExprInfo* pExprInfo = &pQueryAttr->pExpr1[i];
......@@ -2373,7 +2344,7 @@ bool onlyQueryTags(SQueryAttr* pQueryAttr) {
/////////////////////////////////////////////////////////////////////////////////////////////
void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) {
void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) {
assert(key >= keyFirst && key <= keyLast && pQueryAttr->interval.sliding <= pQueryAttr->interval.interval);
win->skey = taosTimeTruncate(key, &pQueryAttr->interval, pQueryAttr->precision);
......@@ -2394,7 +2365,7 @@ void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFir
/*
* todo add more parameters to check soon..
*/
bool colIdCheck(SQueryAttr *pQueryAttr, uint64_t qId) {
bool colIdCheck(STaskAttr *pQueryAttr, uint64_t qId) {
// load data column information is incorrect
for (int32_t i = 0; i < pQueryAttr->numOfCols - 1; ++i) {
if (pQueryAttr->tableCols[i].colId == pQueryAttr->tableCols[i + 1].colId) {
......@@ -2408,7 +2379,7 @@ bool colIdCheck(SQueryAttr *pQueryAttr, uint64_t qId) {
// todo ignore the avg/sum/min/max/count/stddev/top/bottom functions, of which
// the scan order is not matter
static bool onlyOneQueryType(SQueryAttr *pQueryAttr, int32_t functId, int32_t functIdDst) {
static bool onlyOneQueryType(STaskAttr *pQueryAttr, int32_t functId, int32_t functIdDst) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
......@@ -2425,13 +2396,13 @@ static bool onlyOneQueryType(SQueryAttr *pQueryAttr, int32_t functId, int32_t fu
return true;
}
static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_FIRST, FUNCTION_FIRST_DST); }
static bool onlyFirstQuery(STaskAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_FIRST, FUNCTION_FIRST_DST); }
static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_LAST, FUNCTION_LAST_DST); }
static bool onlyLastQuery(STaskAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_LAST, FUNCTION_LAST_DST); }
static bool notContainSessionOrStateWindow(SQueryAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); }
static bool notContainSessionOrStateWindow(STaskAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); }
static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) {
static int32_t updateBlockLoadStatus(STaskAttr *pQuery, int32_t status) {
bool hasFirstLastFunc = false;
bool hasOtherFunc = false;
......@@ -2465,7 +2436,7 @@ static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) {
return status;
}
static void doUpdateLastKey(SQueryAttr* pQueryAttr) {
static void doUpdateLastKey(STaskAttr* pQueryAttr) {
STimeWindow* win = &pQueryAttr->window;
size_t num = taosArrayGetSize(pQueryAttr->tableGroupInfo.pGroupList);
......@@ -2485,7 +2456,7 @@ static void doUpdateLastKey(SQueryAttr* pQueryAttr) {
}
static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) {
SQueryAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
// in case of point-interpolation query, use asc order scan
char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64
......@@ -2580,8 +2551,8 @@ static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool
}
}
static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t MIN_ROWS_PER_PAGE = 4;
*rowsize = (int32_t)(pQueryAttr->resultRowSize * getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
......@@ -2596,8 +2567,8 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
//static FORCE_INLINE bool doFilterByBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) {
// SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
//static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) {
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
//
// if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
// return true;
......@@ -2606,7 +2577,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
// return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
//}
static bool overlapWithTimeWindow(SQueryAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
STimeWindow w = {0};
TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
......@@ -2655,7 +2626,7 @@ static bool overlapWithTimeWindow(SQueryAttr* pQueryAttr, SDataBlockInfo* pBlock
return false;
}
static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool ascQuery) {
static int32_t doTSJoinFilter(STaskRuntimeEnv *pRuntimeEnv, TSKEY key, bool ascQuery) {
STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf);
#if defined(_DEBUG_VIEW)
......@@ -2781,7 +2752,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
}
}
void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols,
void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols,
SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
......@@ -2823,7 +2794,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
tfree(p);
}
void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) {
void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) {
int32_t numOfRows = pBlock->info.rows;
int8_t *p = NULL;
......@@ -2913,13 +2884,13 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi
}
}
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
pBlock->pDataBlock = NULL;
pBlock->pBlockAgg = NULL;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int64_t groupId = pRuntimeEnv->current->groupIndex;
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
......@@ -3184,10 +3155,10 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num
}
void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
SExprInfo *pExpr = pOperatorInfo->pExpr;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SExprInfo* pExprInfo = &pExpr[0];
int32_t functionId = getExprFunctionId(pExprInfo);
......@@ -3242,7 +3213,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
}
}
void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) {
void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) {
SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo;
pBlock->info.rows = 0;
......@@ -3293,8 +3264,8 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo)
}
}
static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
static void setupQueryRangeForReverseScan(STaskRuntimeEnv* pRuntimeEnv) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv));
for(int32_t i = 0; i < numOfGroups; ++i) {
......@@ -3337,7 +3308,7 @@ int32_t initResultRow(SResultRow *pResultRow) {
* +------------+-------------------------------------------+-------------------------------------------+
* offset[0] offset[1] offset[2]
*/
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) {
void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) {
SQLFunctionCtx* pCtx = pInfo->pCtx;
SSDataBlock* pDataBlock = pInfo->pRes;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
......@@ -3461,7 +3432,7 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) {
}
}
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status) {
void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status) {
if (status == QUERY_NOT_COMPLETED) {
pRuntimeEnv->status = status;
} else {
......@@ -3471,8 +3442,8 @@ void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status) {
}
}
static void setupEnvForReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
static void setupEnvForReverseScan(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
if (pRuntimeEnv->pTsBuf) {
SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order);
......@@ -3493,8 +3464,8 @@ static void setupEnvForReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo
}
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperator->numOfOutput;
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
......@@ -3539,7 +3510,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
}
}
static bool hasMainOutput(SQueryAttr *pQueryAttr) {
static bool hasMainOutput(STaskAttr *pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
......@@ -3551,7 +3522,7 @@ static bool hasMainOutput(SQueryAttr *pQueryAttr) {
return false;
}
STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) {
STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) {
STableQueryInfo *pTableQueryInfo = buf;
pTableQueryInfo->win = win;
......@@ -3602,7 +3573,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
cleanupResultRowInfo(&pTableQueryInfo->resInfo);
}
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
......@@ -3635,7 +3606,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
}
}
void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx,
void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx,
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId) {
// for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0;
......@@ -3659,7 +3630,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
}
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId,
void setExecutionContext(STaskRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId,
TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
......@@ -3675,7 +3646,7 @@ void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, i
pRuntimeEnv->prevGroupId = tableGroupId;
}
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
......@@ -3698,8 +3669,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
}
}
void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SSqlExpr* pExpr = &pExprInfo->base;
// if (pQueryAttr->stableQuery && (pRuntimeEnv->pTsBuf != NULL) &&
......@@ -3723,8 +3694,8 @@ void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExpr
// }
}
int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
assert(pRuntimeEnv->pTsBuf != NULL);
......@@ -3766,9 +3737,9 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, SVariant* pTag,
}
// TODO refactor: this funciton should be merged with setparamForStableStddevColumnData function.
void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExprInfo) {
void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExprInfo) {
#if 0
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfExprs = pQueryAttr->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
......@@ -3801,8 +3772,8 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
#endif
}
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
#if 0
int32_t numOfExprs = pQueryAttr->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
......@@ -3842,8 +3813,8 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
* is a previous result generated or not.
*/
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
SResultRowInfo *pResultRowInfo = &pTableQueryInfo->resInfo;
......@@ -3887,8 +3858,8 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
* @param result
*/
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
......@@ -3946,7 +3917,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
return 0;
}
static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) {
static void toSSDataBlock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
pBlock->info.rows = 0;
......@@ -3954,7 +3925,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
return;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, orderType, pBlock);
......@@ -3969,9 +3940,9 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
}
}
static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput,
static void updateNumOfRowsInResultRows(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
// update the number of result for each, only update the number of rows for the corresponding window result.
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
......@@ -4000,8 +3971,8 @@ static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows,
}
static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
......@@ -4187,7 +4158,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) {
}
void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pQInfo->summary;
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
......@@ -4226,8 +4197,8 @@ void queryCostStatis(SQInfo *pQInfo) {
}
}
//static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
......@@ -4262,8 +4233,8 @@ void queryCostStatis(SQInfo *pQInfo) {
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
//}
//void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
// return;
......@@ -4301,8 +4272,8 @@ void queryCostStatis(SQInfo *pQInfo) {
// }
//}
//static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) {
// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//
// assert(pQueryAttr->limit.offset == 0);
......@@ -4352,8 +4323,8 @@ void queryCostStatis(SQInfo *pQInfo) {
// return true;
//}
//static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// assert(*start <= pRuntimeEnv->current->lastKey);
// } else {
......@@ -4463,18 +4434,18 @@ void queryCostStatis(SQInfo *pQInfo) {
//}
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
if (p->upstream == NULL) {
if (p->pDownstream == NULL) {
assert(p->numOfUpstream == 0);
}
p->upstream = realloc(p->upstream, POINTER_BYTES * (p->numOfUpstream + 1));
p->upstream[p->numOfUpstream++] = pUpstream;
p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfUpstream + 1));
p->pDownstream[p->numOfUpstream++] = pUpstream;
}
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
#if 0
// TODO set the tags scan handle
if (onlyQueryTags(pQueryAttr)) {
......@@ -4533,9 +4504,9 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator,
void* param) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pQueryAttr->tsdb = tsdb;
if (tsdb != NULL) {
......@@ -4620,7 +4591,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
return TSDB_CODE_SUCCESS;
}
static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryInfo* pTableQueryInfo) {
static void doTableQueryInfoTimeWindowCheck(STaskAttr* pQueryAttr, STableQueryInfo* pTableQueryInfo) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
assert(
(pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
......@@ -4634,7 +4605,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI
}
}
//STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) {
//STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
// STsdbQueryCond cond = {
// .colList = pQueryAttr->tableCols,
// .order = pQueryAttr->order.order,
......@@ -4676,7 +4647,7 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo) {
// }
//}
static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
static void doCloseAllTimeWindow(STaskRuntimeEnv* pRuntimeEnv) {
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = GET_TABLEGROUP(pRuntimeEnv, i);
......@@ -4694,8 +4665,8 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
STableScanInfo *pTableScanInfo = pOperator->info;
SSDataBlock *pBlock = &pTableScanInfo->block;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STableGroupInfo *pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo;
*newgroup = false;
......@@ -4751,8 +4722,8 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
STableScanInfo *pTableScanInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
*newgroup = false;
......@@ -4867,7 +4838,7 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
}
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -4891,7 +4862,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
return pOperator;
}
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
......@@ -4915,7 +4886,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
return pOperator;
}
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
......@@ -4998,7 +4969,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
}
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -5019,7 +4990,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
return pOptr;
}
SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
SArray* getOrderCheckColumns(STaskAttr* pQuery) {
int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo);
SArray* pOrderColumns = NULL;
......@@ -5058,7 +5029,7 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
return pOrderColumns;
}
SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0 : taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo);
SArray* pOrderColumns = NULL;
......@@ -5109,7 +5080,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevRow);
}
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
......@@ -5177,7 +5148,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
return pOperator;
}
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
int32_t numOfRows, void *merger) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
......@@ -5252,9 +5223,9 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
......@@ -5288,7 +5259,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) {
SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
{
......@@ -5339,12 +5310,12 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -5386,7 +5357,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
SAggOperatorInfo* pAggInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
......@@ -5398,10 +5369,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
return pInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -5455,7 +5426,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes;
......@@ -5493,9 +5464,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
assert(*newgroup == false);
......@@ -5547,13 +5518,13 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
}
SLimitOperatorInfo* pInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SSDataBlock* pBlock = NULL;
while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
......@@ -5599,12 +5570,12 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
}
SFilterOperatorInfo* pCondInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -5631,7 +5602,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
......@@ -5641,11 +5612,11 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -5690,7 +5661,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
......@@ -5701,11 +5672,11 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -5749,7 +5720,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
}
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
int64_t st = taosGetTimestampUs();
......@@ -5765,10 +5736,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -5809,7 +5780,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
}
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
......@@ -5820,10 +5791,10 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -5862,7 +5833,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
......@@ -5945,7 +5916,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
......@@ -5956,10 +5927,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
return pBInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
......@@ -6004,7 +5975,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
......@@ -6015,12 +5986,12 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
return pBInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
//pQueryAttr->order.order = TSDB_ORDER_ASC;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -6062,7 +6033,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
SGroupbyOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
......@@ -6073,7 +6044,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
return pInfo->binfo.pRes;
}
SOperatorInfo* upstream = pOperator->upstream[0];
SOperatorInfo* upstream = pOperator->pDownstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
......@@ -6117,7 +6088,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
return pInfo->binfo.pRes;
}
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryRuntimeEnv* pRuntimeEnv, bool* newgroup) {
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, STaskRuntimeEnv* pRuntimeEnv, bool* newgroup) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
......@@ -6130,7 +6101,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryR
*newgroup = true;
}
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRuntimeEnv *pRuntimeEnv, bool *newgroup) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
......@@ -6155,16 +6126,16 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
return NULL;
}
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
return pInfo->pRes;
}
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
assert(pBlock != NULL);
......@@ -6220,7 +6191,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
}
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr) {
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr) {
for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionId == FUNCTION_STDDEV || functionId == FUNCTION_PERCT) {
......@@ -6240,12 +6211,12 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
pOperator->cleanup(pOperator->info, pOperator->numOfOutput);
}
if (pOperator->upstream != NULL) {
if (pOperator->pDownstream != NULL) {
for(int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
destroyOperatorInfo(pOperator->upstream[i]);
destroyOperatorInfo(pOperator->pDownstream[i]);
}
tfree(pOperator->upstream);
tfree(pOperator->pDownstream);
pOperator->numOfUpstream = 0;
}
......@@ -6253,10 +6224,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
tfree(pOperator);
}
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
......@@ -6353,7 +6324,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
......@@ -6379,7 +6350,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
return pOperator;
}
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
pInfo->seed = rand();
......@@ -6442,7 +6413,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3
return 0;
}
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) {
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
......@@ -6467,7 +6438,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
return pOperator;
}
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit;
......@@ -6485,7 +6456,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator;
}
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
......@@ -6510,7 +6481,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
}
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
......@@ -6534,7 +6505,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
return pOperator;
}
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1;
pInfo->reptScan = false;
......@@ -6557,7 +6528,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
appendUpstream(pOperator, upstream);
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
......@@ -6583,7 +6554,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return pOperator;
}
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
......@@ -6607,7 +6578,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti
return pOperator;
}
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
......@@ -6633,14 +6604,14 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu
}
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize *
(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)));
......@@ -6664,13 +6635,13 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return pOperator;
}
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
pInfo->multigroupResult = multigroupResult;
{
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal);
STimeWindow w = TSWINDOW_INITIALIZER;
......@@ -6703,10 +6674,10 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator;
}
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
pInfo->slimit = pQueryAttr->slimit;
......@@ -6758,7 +6729,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
return NULL;
}
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity;
STagScanInfo *pInfo = pOperator->info;
......@@ -6770,7 +6741,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
assert(pQueryAttr->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->pExpr[0];
......@@ -6883,7 +6854,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
#endif
}
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
......@@ -6968,9 +6939,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup);
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
......@@ -7021,7 +6992,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
}
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
pInfo->totalBytes = 0;
pInfo->buf = NULL;
......@@ -7207,7 +7178,7 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t
* @param pExpr
* @return
*/
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param) {
int32_t code = TSDB_CODE_SUCCESS;
// if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) {
......@@ -7924,7 +7895,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
return NULL;
}
int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId) {
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId) {
for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) {
// if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].flist.filterInfo != NULL) {
// pQueryAttr->numOfFilterCols++;
......@@ -7943,7 +7914,7 @@ int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId) {
return TSDB_CODE_SUCCESS;
}
static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
static void doUpdateExprColumnIndex(STaskAttr *pQueryAttr) {
assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL);
for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) {
......@@ -7980,7 +7951,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) {
}
}
void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo) {
void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
// the minimum number of rows for projection query
......@@ -8026,7 +7997,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
SQueryAttr* pQueryAttr = &pQInfo->query;
STaskAttr* pQueryAttr = &pQInfo->query;
pQInfo->runtimeEnv.pQueryAttr = pQueryAttr;
pQueryAttr->tableGroupInfo = *pTableGroupInfo;
......@@ -8145,7 +8116,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->window = pQueryMsg->window;
updateDataCheckOrder(pQInfo, pQueryMsg, pQueryAttr->stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STimeWindow window = pQueryAttr->window;
int32_t index = 0;
......@@ -8213,7 +8184,7 @@ _cleanup_qinfo:
// filterFreeInfo(pFilters);
_cleanup:
freeQInfo(pQInfo);
doDestroyTask(pQInfo);
return NULL;
}
......@@ -8231,14 +8202,14 @@ bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo);
}
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start,
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger) {
int32_t code = TSDB_CODE_SUCCESS;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->qinfo = pQInfo;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STSBuf *pTsBuf = NULL;
......@@ -8292,7 +8263,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
_error:
// table query ref will be decrease during error handling
freeQInfo(pQInfo);
doDestroyTask(pQInfo);
return code;
}
......@@ -8373,20 +8344,20 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
return NULL;
}
void freeQInfo(SQInfo *pQInfo) {
void doDestroyTask(SQInfo *pQInfo) {
if (!isValidQInfo(pQInfo)) {
return;
}
//qDebug("QInfo:0x%"PRIx64" start to free QInfo", pQInfo->qId);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
freeQueryAttr(pQueryAttr);
// tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo);
......@@ -8407,8 +8378,8 @@ void freeQInfo(SQInfo *pQInfo) {
int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) {
// the remained number of retrieved rows, not the interpolated result
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
// load data from file to msg buffer
if (pQueryAttr->tsCompQuery) {
......@@ -8535,8 +8506,8 @@ int32_t checkForQueryBuf(size_t numOfTables) {
}
bool checkNeedToCompressQueryCol(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
SSDataBlock* pRes = pRuntimeEnv->outputBuf;
......@@ -8569,7 +8540,7 @@ void releaseQueryBuf(size_t numOfTables) {
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
void freeQueryAttr(SQueryAttr* pQueryAttr) {
void freeQueryAttr(STaskAttr* pQueryAttr) {
if (pQueryAttr != NULL) {
if (pQueryAttr->fillVal != NULL) {
tfree(pQueryAttr->fillVal);
......
......@@ -126,7 +126,6 @@ typedef struct SSchJob {
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern int32_t schLaunchTask(SSchJob *job, SSchTask *task);
extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册