提交 2d53617d 编写于 作者: H Haojun Liao

[td-11818] support select * on child table.

上级 506dd590
...@@ -62,6 +62,12 @@ typedef struct SConstantItem { ...@@ -62,6 +62,12 @@ typedef struct SConstantItem {
SVariant value; SVariant value;
} SConstantItem; } SConstantItem;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg; SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData> SArray *pDataBlock; // SArray<SColumnInfoData>
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "mallocator.h" #include "mallocator.h"
#include "meta.h" #include "meta.h"
#include "common.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -39,6 +40,10 @@ typedef struct STable { ...@@ -39,6 +40,10 @@ typedef struct STable {
STSchema *pSchema; STSchema *pSchema;
} STable; } STable;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid #define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid #define TABLE_UID(t) (t)->uid
...@@ -58,6 +63,22 @@ typedef struct STsdbCfg { ...@@ -58,6 +63,22 @@ typedef struct STsdbCfg {
int8_t compression; int8_t compression;
} STsdbCfg; } STsdbCfg;
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond;
typedef struct {
void *pTable;
TSKEY lastKey;
uint64_t uid;
} STableKeyInfo;
// STsdb // STsdb
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta); STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta);
void tsdbClose(STsdb *); void tsdbClose(STsdb *);
...@@ -70,6 +91,119 @@ int tsdbCommit(STsdb *pTsdb); ...@@ -70,6 +91,119 @@ int tsdbCommit(STsdb *pTsdb);
int tsdbOptionsInit(STsdbCfg *); int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *);
typedef void* tsdbReadHandleT;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
void *pRef);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
//tsdbReadHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(tsdbReadHandleT* pHandle);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool tsdbNextDataBlock(tsdbReadHandleT pTsdbReadHandle);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(tsdbReadHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReadHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(tsdbReadHandleT *pTsdbReadHandle, SArray *pColumnIdList);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -22,23 +22,25 @@ extern "C" { ...@@ -22,23 +22,25 @@ extern "C" {
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
/** /**
* create the qinfo object according to QueryTableMsg * Create the exec task object according to task json
* @param tsdb * @param tsdb
* @param pQueryTableMsg * @param vgId
* @param pTaskInfoMsg
* @param pTaskInfo * @param pTaskInfo
* @param qId
* @return * @return
*/ */
int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryTableMsg, qTaskInfo_t* pTaskInfo, uint64_t qId); int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo);
/** /**
* the main query execution function, including query on both table and multiple tables, * the main task execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions * which are decided according to the tag or table name query conditions
* *
* @param qinfo * @param qinfo
* @return * @return
*/ */
bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId); bool qExecTask(qTaskInfo_t qTask);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
...@@ -81,7 +83,7 @@ int32_t qKillTask(qTaskInfo_t qinfo); ...@@ -81,7 +83,7 @@ int32_t qKillTask(qTaskInfo_t qinfo);
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qIsQueryCompleted(qTaskInfo_t qinfo); int32_t qIsTaskCompleted(qTaskInfo_t qinfo);
/** /**
* destroy query info structure * destroy query info structure
...@@ -113,7 +115,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -113,7 +115,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
* @param numOfIndex * @param numOfIndex
* @return * @return
*/ */
int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex); //int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex);
/** /**
* Update the table id list of a given query. * Update the table id list of a given query.
......
...@@ -150,7 +150,7 @@ struct SQueryNode; ...@@ -150,7 +150,7 @@ struct SQueryNode;
* @param requestId * @param requestId
* @return * @return
*/ */
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId); int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, SSchema** pSchema, uint32_t* numOfResCols, uint64_t requestId);
// Set datasource of this subplan, multiple calls may be made to a subplan. // Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule // @subplan subplan to be schedule
......
...@@ -197,7 +197,25 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -197,7 +197,25 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) { int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
pRequest->type = pQueryNode->type; pRequest->type = pQueryNode->type;
return qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
SSchema *pSchema = NULL;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &pResInfo->numOfCols, pRequest->requestId);
if (code != 0) {
return code;
}
if (pQueryNode->type == TSDB_SQL_SELECT) {
pResInfo->fields = calloc(1, sizeof(TAOS_FIELD));
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
pResInfo->fields[i].bytes = pSchema[i].bytes;
pResInfo->fields[i].type = pSchema[i].type;
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
}
}
return code;
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
......
此差异已折叠。
...@@ -23,7 +23,7 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); ...@@ -23,7 +23,7 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery);
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("query message is processed"); vTrace("query message is processed");
return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
} }
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
...@@ -13,6 +13,7 @@ else(0) ...@@ -13,6 +13,7 @@ else(0)
"src/tsdbReadImpl.c" "src/tsdbReadImpl.c"
"src/tsdbFile.c" "src/tsdbFile.c"
"src/tsdbFS.c" "src/tsdbFS.c"
"src/tsdbRead.c"
) )
endif(0) endif(0)
......
...@@ -1253,7 +1253,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * ...@@ -1253,7 +1253,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock->keyFirst = dataColsKeyFirst(pDataCols); pBlock->keyFirst = dataColsKeyFirst(pDataCols);
pBlock->keyLast = dataColsKeyLast(pDataCols); pBlock->keyLast = dataColsKeyLast(pDataCols);
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 tsdbDebug("vgId:%d uid:%"PRId64" a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
......
aux_source_directory(src EXECUTOR_SRC) aux_source_directory(src EXECUTOR_SRC)
add_library(executor ${EXECUTOR_SRC}) #add_library(executor ${EXECUTOR_SRC})
#target_link_libraries(
# executor
# PRIVATE os util common function parser planner qcom tsdb
#)
add_library(executor STATIC ${EXECUTOR_SRC})
#set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
# )
target_link_libraries(executor
PRIVATE os util common function parser planner qcom tsdb
)
target_include_directories( target_include_directories(
executor executor
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/executor" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/executor"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
target_link_libraries( #if(${BUILD_TEST})
executor ADD_SUBDIRECTORY(test)
PRIVATE os util common function parser planner qcom #endif(${BUILD_TEST})
) \ No newline at end of file
\ No newline at end of file
...@@ -33,7 +33,7 @@ struct SDataSink; ...@@ -33,7 +33,7 @@ struct SDataSink;
struct SSDataBlock; struct SSDataBlock;
typedef struct SDataSinkMgtCfg { typedef struct SDataSinkMgtCfg {
uint32_t maxDataBlockNum; uint32_t maxDataBlockNum; // todo: this should be numOfRows?
uint32_t maxDataBlockNumPerQuery; uint32_t maxDataBlockNumPerQuery;
} SDataSinkMgtCfg; } SDataSinkMgtCfg;
......
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES) #define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId) #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.queryId)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define curTimeWindowIndex(_winres) ((_winres)->curIndex)
...@@ -157,6 +157,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); ...@@ -157,6 +157,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset); int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t initUdfInfo(struct SUdfInfo* pUdfInfo); //int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
...@@ -29,15 +29,8 @@ ...@@ -29,15 +29,8 @@
#include "tpagedfile.h" #include "tpagedfile.h"
#include "planner.h" #include "planner.h"
struct SColumnFilterElem; struct SColumnFilterElem;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
...@@ -51,19 +44,19 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -51,19 +44,19 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData? 1 : 0) #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData? 1 : 0)
enum { enum {
// when query starts to execute, this status will set // when this task starts to execute, this status will set
QUERY_NOT_COMPLETED = 0x1u, TASK_NOT_COMPLETED = 0x1u,
/* query is over /* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc. * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed * 2. when all data within queried time window, it is also denoted as query_completed
*/ */
QUERY_COMPLETED = 0x2u, TASK_COMPLETED = 0x2u,
/* when the result is not completed return to client, this status will be /* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option * usually used in case of interval query with interpolation option
*/ */
QUERY_OVER = 0x4u, TASK_OVER = 0x4u,
}; };
typedef struct SResultRowCell { typedef struct SResultRowCell {
...@@ -129,6 +122,7 @@ typedef struct { ...@@ -129,6 +122,7 @@ typedef struct {
} SOperatorProfResult; } SOperatorProfResult;
typedef struct STaskCostInfo { typedef struct STaskCostInfo {
int64_t created;
int64_t start; int64_t start;
int64_t end; int64_t end;
...@@ -246,13 +240,14 @@ typedef struct STaskIdInfo { ...@@ -246,13 +240,14 @@ typedef struct STaskIdInfo {
uint64_t taskId; // this is a subplan id uint64_t taskId; // this is a subplan id
} STaskIdInfo; } STaskIdInfo;
typedef struct STaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
char *content; char *content;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
STaskCostInfo cost; STaskCostInfo cost;
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
...@@ -260,8 +255,9 @@ typedef struct STaskInfo { ...@@ -260,8 +255,9 @@ typedef struct STaskInfo {
// int32_t dataReady; // denote if query result is ready or not // int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context // void* rspContext; // response context
char *sql; // query sql string char *sql; // query sql string
jmp_buf env; jmp_buf env; //
} STaskInfo; struct SOperatorInfo *pRoot;
} SExecTaskInfo;
typedef struct STaskRuntimeEnv { typedef struct STaskRuntimeEnv {
jmp_buf env; jmp_buf env;
...@@ -269,7 +265,7 @@ typedef struct STaskRuntimeEnv { ...@@ -269,7 +265,7 @@ typedef struct STaskRuntimeEnv {
uint32_t status; // query status uint32_t status; // query status
void* qinfo; void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
void* pQueryHandle; void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
bool enableGroupData; bool enableGroupData;
...@@ -314,8 +310,8 @@ typedef struct SOperatorInfo { ...@@ -314,8 +310,8 @@ typedef struct SOperatorInfo {
char *name; // name, used to show the query execution plan char *name; // name, used to show the query execution plan
void *info; // extension attribution void *info; // extension attribution
SExprInfo *pExpr; SExprInfo *pExpr;
STaskRuntimeEnv *pRuntimeEnv; STaskRuntimeEnv *pRuntimeEnv; // todo remove it
STaskInfo *pTaskInfo; SExecTaskInfo *pTaskInfo;
struct SOperatorInfo **pDownstream; // downstram pointer list struct SOperatorInfo **pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
...@@ -376,7 +372,7 @@ typedef struct STaskParam { ...@@ -376,7 +372,7 @@ typedef struct STaskParam {
} STaskParam; } STaskParam;
typedef struct STableScanInfo { typedef struct STableScanInfo {
void *pQueryHandle; void *pTsdbReadHandle;
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t numOfSkipped; int32_t numOfSkipped;
int32_t numOfBlockStatis; int32_t numOfBlockStatis;
...@@ -544,7 +540,7 @@ typedef struct SOrderOperatorInfo { ...@@ -544,7 +540,7 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
...@@ -572,11 +568,11 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -572,11 +568,11 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); //SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); //SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
SSDataBlock* doSLimit(void* param, bool* newgroup); //SSDataBlock* doSLimit(void* param, bool* newgroup);
int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); //int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
...@@ -617,14 +613,14 @@ STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); ...@@ -617,14 +613,14 @@ STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool isQueryKilled(SQInfo *pQInfo); bool isTaskKilled(SExecTaskInfo *pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo *pQInfo); bool checkNeedToCompressQueryCol(SQInfo *pQInfo);
bool doBuildResCheck(SQInfo* pQInfo); bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status); void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(STaskAttr* pQueryAttr); bool onlyQueryTags(STaskAttr* pQueryAttr);
void destroyUdfInfo(struct SUdfInfo* pUdfInfo); //void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool isValidQInfo(void *param); bool isValidQInfo(void *param);
...@@ -644,5 +640,7 @@ void freeQueryAttr(STaskAttr *pQuery); ...@@ -644,5 +640,7 @@ void freeQueryAttr(STaskAttr *pQuery);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
#endif // TDENGINE_EXECUTORIMPL_H #endif // TDENGINE_EXECUTORIMPL_H
...@@ -547,7 +547,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -547,7 +547,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
pTableQueryInfoList = malloc(POINTER_BYTES * size); pTableQueryInfoList = malloc(POINTER_BYTES * size);
if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) { if (pTableQueryInfoList == NULL || posList == NULL || pGroupResInfo->pRows == NULL || pGroupResInfo->pRows == NULL) {
// qError("QInfo:%"PRIu64" failed alloc memory", GET_QID(pRuntimeEnv)); // qError("QInfo:%"PRIu64" failed alloc memory", GET_TASKID(pRuntimeEnv));
code = TSDB_CODE_QRY_OUT_OF_MEMORY; code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _end; goto _end;
} }
...@@ -619,7 +619,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv ...@@ -619,7 +619,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int64_t endt = taosGetTimestampMs(); int64_t endt = taosGetTimestampMs();
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_QID(pRuntimeEnv), // qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, endt - startt); // pGroupResInfo->currentGroup, endt - startt);
_end: _end:
...@@ -641,13 +641,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun ...@@ -641,13 +641,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
break; break;
} }
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_QID(pRuntimeEnv), pGroupResInfo->currentGroup); // qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_TASKID(pRuntimeEnv), pGroupResInfo->currentGroup);
cleanupGroupResInfo(pGroupResInfo); cleanupGroupResInfo(pGroupResInfo);
incNextGroup(pGroupResInfo); incNextGroup(pGroupResInfo);
} }
// int64_t elapsedTime = taosGetTimestampUs() - st; // int64_t elapsedTime = taosGetTimestampUs() - st;
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_QID(pRuntimeEnv), // qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_TASKID(pRuntimeEnv),
// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime); // pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
MESSAGE(STATUS "build parser unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
executorTest
PUBLIC os util common transport gtest taos qcom executor function planner
)
TARGET_INCLUDE_DIRECTORIES(
executorTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/executor/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/executor/inc"
)
...@@ -70,6 +70,9 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { ...@@ -70,6 +70,9 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
int32_t code = qParserValidateSqlNode(&pCxt->ctx, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen); int32_t code = qParserValidateSqlNode(&pCxt->ctx, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
*pQuery = (SQueryNode*)pQueryInfo; *pQuery = (SQueryNode*)pQueryInfo;
} else {
terrno = code;
return code;
} }
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册