Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9f864b74
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9f864b74
编写于
1月 11, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/qnode
上级
89dcffce
a2545549
变更
29
展开全部
隐藏空白更改
内联
并排
Showing
29 changed file
with
2280 addition
and
2006 deletion
+2280
-2006
include/common/common.h
include/common/common.h
+6
-0
include/dnode/vnode/tsdb/tsdb.h
include/dnode/vnode/tsdb/tsdb.h
+134
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+16
-14
include/libs/planner/planner.h
include/libs/planner/planner.h
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+19
-1
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+449
-439
source/dnode/vnode/impl/src/vnodeQuery.c
source/dnode/vnode/impl/src/vnodeQuery.c
+1
-1
source/dnode/vnode/tsdb/CMakeLists.txt
source/dnode/vnode/tsdb/CMakeLists.txt
+1
-0
source/dnode/vnode/tsdb/src/tsdbCommit.c
source/dnode/vnode/tsdb/src/tsdbCommit.c
+1
-1
source/dnode/vnode/tsdb/src/tsdbRead.c
source/dnode/vnode/tsdb/src/tsdbRead.c
+1066
-1086
source/libs/executor/CMakeLists.txt
source/libs/executor/CMakeLists.txt
+23
-8
source/libs/executor/inc/dataSinkMgt.h
source/libs/executor/inc/dataSinkMgt.h
+1
-1
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+2
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+24
-26
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-4
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+55
-186
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+142
-179
source/libs/executor/test/CMakeLists.txt
source/libs/executor/test/CMakeLists.txt
+18
-0
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+221
-0
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+3
-0
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+7
-4
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+10
-3
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+22
-14
source/libs/planner/test/phyPlanTests.cpp
source/libs/planner/test/phyPlanTests.cpp
+4
-1
source/libs/qworker/CMakeLists.txt
source/libs/qworker/CMakeLists.txt
+21
-5
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+7
-6
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+13
-15
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+8
-8
未找到文件。
include/common/common.h
浏览文件 @
9f864b74
...
...
@@ -62,6 +62,12 @@ typedef struct SConstantItem {
SVariant
value
;
}
SConstantItem
;
typedef
struct
{
uint32_t
numOfTables
;
SArray
*
pGroupList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
}
STableGroupInfo
;
typedef
struct
SSDataBlock
{
SColumnDataAgg
*
pBlockAgg
;
SArray
*
pDataBlock
;
// SArray<SColumnInfoData>
...
...
include/dnode/vnode/tsdb/tsdb.h
浏览文件 @
9f864b74
...
...
@@ -18,6 +18,7 @@
#include "mallocator.h"
#include "meta.h"
#include "common.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -39,6 +40,10 @@ typedef struct STable {
STSchema
*
pSchema
;
}
STable
;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
...
...
@@ -58,6 +63,22 @@ typedef struct STsdbCfg {
int8_t
compression
;
}
STsdbCfg
;
// query condition to build multi-table data block iterator
typedef
struct
STsdbQueryCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
}
STsdbQueryCond
;
typedef
struct
{
void
*
pTable
;
TSKEY
lastKey
;
uint64_t
uid
;
}
STableKeyInfo
;
// STsdb
STsdb
*
tsdbOpen
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
);
void
tsdbClose
(
STsdb
*
);
...
...
@@ -70,6 +91,119 @@ int tsdbCommit(STsdb *pTsdb);
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
typedef
void
*
tsdbReadHandleT
;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
tsdbReadHandleT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
void
*
pRef
);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
//tsdbReadHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReadHandleT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
bool
isTsdbCacheLastRow
(
tsdbReadHandleT
*
pTsdbReadHandle
);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReadHandleT
*
pHandle
);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool
tsdbNextDataBlock
(
tsdbReadHandleT
pTsdbReadHandle
);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void
tsdbRetrieveDataBlockInfo
(
tsdbReadHandleT
*
pTsdbReadHandle
,
SDataBlockInfo
*
pBlockInfo
);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdbReadHandleT
*
pTsdbReadHandle
,
SDataStatis
**
pBlockStatis
);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray
*
tsdbRetrieveDataBlock
(
tsdbReadHandleT
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t
tsdbGetOneTableGroup
(
STsdb
*
tsdb
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
/**
* clean up the query handle
* @param queryHandle
*/
void
tsdbCleanupQueryHandle
(
tsdbReadHandleT
queryHandle
);
#ifdef __cplusplus
}
#endif
...
...
include/libs/executor/executor.h
浏览文件 @
9f864b74
...
...
@@ -22,23 +22,25 @@ extern "C" {
typedef
void
*
qTaskInfo_t
;
/**
* create the qinfo object according to QueryTableMsg
* @param tsdb
* @param pQueryTableMsg
* @param pTaskInfo
* @return
*/
int32_t
qCreateTask
(
void
*
tsdb
,
int32_t
vgId
,
void
*
pQueryTableMsg
,
qTaskInfo_t
*
pTaskInfo
,
uint64_t
qId
);
/**
* the main query execution function, including query on both table and multiple tables,
/**
* Create the exec task object according to task json
* @param tsdb
* @param vgId
* @param pTaskInfoMsg
* @param pTaskInfo
* @param qId
* @return
*/
int32_t
qCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
);
/**
* the main task execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions
*
* @param qinfo
* @return
*/
bool
qExecTask
(
qTaskInfo_t
q
info
,
uint64_t
*
qId
);
bool
qExecTask
(
qTaskInfo_t
q
Task
,
SSDataBlock
**
pRes
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
...
...
@@ -81,7 +83,7 @@ int32_t qKillTask(qTaskInfo_t qinfo);
* @param qinfo
* @return
*/
int32_t
qIs
Query
Completed
(
qTaskInfo_t
qinfo
);
int32_t
qIs
Task
Completed
(
qTaskInfo_t
qinfo
);
/**
* destroy query info structure
...
...
@@ -113,7 +115,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
* @param numOfIndex
* @return
*/
int32_t
qCreateTableGroupByGroupExpr
(
SArray
*
pTableIdList
,
TSKEY
skey
,
STableGroupInfo
groupInfo
,
SColIndex
*
groupByIndex
,
int32_t
numOfIndex
);
//
int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex);
/**
* Update the table id list of a given query.
...
...
include/libs/planner/planner.h
浏览文件 @
9f864b74
...
...
@@ -150,7 +150,7 @@ struct SQueryNode;
* @param requestId
* @return
*/
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pQueryInfo
,
struct
SQueryDag
**
pDag
,
uint64_t
requestId
);
int32_t
qCreateQueryDag
(
const
struct
SQueryNode
*
pQueryInfo
,
struct
SQueryDag
**
pDag
,
SSchema
**
pSchema
,
uint32_t
*
numOfResCols
,
uint64_t
requestId
);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
...
...
source/client/src/clientImpl.c
浏览文件 @
9f864b74
...
...
@@ -197,7 +197,25 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQueryNode
,
SQueryDag
**
pDag
)
{
pRequest
->
type
=
pQueryNode
->
type
;
return
qCreateQueryDag
(
pQueryNode
,
pDag
,
pRequest
->
requestId
);
SSchema
*
pSchema
=
NULL
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
qCreateQueryDag
(
pQueryNode
,
pDag
,
&
pSchema
,
&
pResInfo
->
numOfCols
,
pRequest
->
requestId
);
if
(
code
!=
0
)
{
return
code
;
}
if
(
pQueryNode
->
type
==
TSDB_SQL_SELECT
)
{
pResInfo
->
fields
=
calloc
(
1
,
sizeof
(
TAOS_FIELD
));
for
(
int32_t
i
=
0
;
i
<
pResInfo
->
numOfCols
;
++
i
)
{
pResInfo
->
fields
[
i
].
bytes
=
pSchema
[
i
].
bytes
;
pResInfo
->
fields
[
i
].
type
=
pSchema
[
i
].
type
;
tstrncpy
(
pResInfo
->
fields
[
i
].
name
,
pSchema
[
i
].
name
,
tListLen
(
pResInfo
->
fields
[
i
].
name
));
}
}
return
code
;
}
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
...
...
source/client/test/clientTests.cpp
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/dnode/vnode/impl/src/vnodeQuery.c
浏览文件 @
9f864b74
...
...
@@ -23,7 +23,7 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery);
int
vnodeProcessQueryReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vTrace
(
"query message is processed"
);
return
qWorkerProcessQueryMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
return
qWorkerProcessQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
}
int
vnodeProcessFetchReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
...
...
source/dnode/vnode/tsdb/CMakeLists.txt
浏览文件 @
9f864b74
...
...
@@ -13,6 +13,7 @@ else(0)
"src/tsdbReadImpl.c"
"src/tsdbFile.c"
"src/tsdbFS.c"
"src/tsdbRead.c"
)
endif
(
0
)
...
...
source/dnode/vnode/tsdb/src/tsdbCommit.c
浏览文件 @
9f864b74
...
...
@@ -1253,7 +1253,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock
->
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
pBlock
->
keyLast
=
dataColsKeyLast
(
pDataCols
);
tsdbDebug
(
"vgId:%d
tid:%d
a block of data is written to file %s, offset %"
PRId64
tsdbDebug
(
"vgId:%d
uid:%"
PRId64
"
a block of data is written to file %s, offset %"
PRId64
" numOfRows %d len %d numOfCols %"
PRId16
" keyFirst %"
PRId64
" keyLast %"
PRId64
,
REPO_ID
(
pRepo
),
TABLE_TID
(
pTable
),
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
rowsToWrite
,
pBlock
->
len
,
pBlock
->
numOfCols
,
pBlock
->
keyFirst
,
pBlock
->
keyLast
);
...
...
source/dnode/vnode/tsdb/src/tsdbRead.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/executor/CMakeLists.txt
浏览文件 @
9f864b74
aux_source_directory
(
src EXECUTOR_SRC
)
add_library
(
executor
${
EXECUTOR_SRC
}
)
#add_library(executor ${EXECUTOR_SRC})
#target_link_libraries(
# executor
# PRIVATE os util common function parser planner qcom tsdb
#)
add_library
(
executor STATIC
${
EXECUTOR_SRC
}
)
#set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/executor"
# )
target_link_libraries
(
executor
PRIVATE os util common function parser planner qcom tsdb
)
target_include_directories
(
executor
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/executor"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
executor
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/executor"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_link_libraries
(
executor
PRIVATE os util common function parser planner qcom
)
\ No newline at end of file
#if(${BUILD_TEST})
ADD_SUBDIRECTORY
(
test
)
#endif(${BUILD_TEST})
\ No newline at end of file
source/libs/executor/inc/dataSinkMgt.h
浏览文件 @
9f864b74
...
...
@@ -33,7 +33,7 @@ struct SDataSink;
struct
SSDataBlock
;
typedef
struct
SDataSinkMgtCfg
{
uint32_t
maxDataBlockNum
;
uint32_t
maxDataBlockNum
;
// todo: this should be numOfRows?
uint32_t
maxDataBlockNumPerQuery
;
}
SDataSinkMgtCfg
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
9f864b74
...
...
@@ -38,7 +38,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_
QID(_r) (((SQInfo*)((_r)->qinfo))->q
Id)
#define GET_
TASKID(_t) (((SExecTaskInfo*)(_t))->id.query
Id)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
...
...
@@ -157,6 +157,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
int32_t
*
offset
);
int32_t
initUdfInfo
(
struct
SUdfInfo
*
pUdfInfo
);
//
int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
9f864b74
...
...
@@ -29,15 +29,8 @@
#include "tpagedfile.h"
#include "planner.h"
struct
SColumnFilterElem
;
typedef
struct
{
uint32_t
numOfTables
;
SArray
*
pGroupList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
}
STableGroupInfo
;
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
...
...
@@ -51,19 +44,19 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData? 1 : 0)
enum
{
// when
query
starts to execute, this status will set
QUERY
_NOT_COMPLETED
=
0x1u
,
// when
this task
starts to execute, this status will set
TASK
_NOT_COMPLETED
=
0x1u
,
/*
query
is over
/*
Task
is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
QUERY
_COMPLETED
=
0x2u
,
TASK
_COMPLETED
=
0x2u
,
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
QUERY
_OVER
=
0x4u
,
TASK
_OVER
=
0x4u
,
};
typedef
struct
SResultRowCell
{
...
...
@@ -129,6 +122,7 @@ typedef struct {
}
SOperatorProfResult
;
typedef
struct
STaskCostInfo
{
int64_t
created
;
int64_t
start
;
int64_t
end
;
...
...
@@ -246,13 +240,14 @@ typedef struct STaskIdInfo {
uint64_t
taskId
;
// this is a subplan id
}
STaskIdInfo
;
typedef
struct
STaskInfo
{
typedef
struct
S
Exec
TaskInfo
{
STaskIdInfo
id
;
char
*
content
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
...
...
@@ -260,8 +255,9 @@ typedef struct STaskInfo {
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char
*
sql
;
// query sql string
jmp_buf
env
;
}
STaskInfo
;
jmp_buf
env
;
//
struct
SOperatorInfo
*
pRoot
;
}
SExecTaskInfo
;
typedef
struct
STaskRuntimeEnv
{
jmp_buf
env
;
...
...
@@ -269,7 +265,7 @@ typedef struct STaskRuntimeEnv {
uint32_t
status
;
// query status
void
*
qinfo
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
void
*
p
Query
Handle
;
void
*
p
TsdbRead
Handle
;
int32_t
prevGroupId
;
// previous executed group id
bool
enableGroupData
;
...
...
@@ -314,8 +310,8 @@ typedef struct SOperatorInfo {
char
*
name
;
// name, used to show the query execution plan
void
*
info
;
// extension attribution
SExprInfo
*
pExpr
;
STaskRuntimeEnv
*
pRuntimeEnv
;
S
TaskInfo
*
pTaskInfo
;
STaskRuntimeEnv
*
pRuntimeEnv
;
// todo remove it
S
ExecTaskInfo
*
pTaskInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
...
...
@@ -376,7 +372,7 @@ typedef struct STaskParam {
}
STaskParam
;
typedef
struct
STableScanInfo
{
void
*
p
Query
Handle
;
void
*
p
TsdbRead
Handle
;
int32_t
numOfBlocks
;
int32_t
numOfSkipped
;
int32_t
numOfBlockStatis
;
...
...
@@ -544,7 +540,7 @@ typedef struct SOrderOperatorInfo {
void
appendUpstream
(
SOperatorInfo
*
p
,
SOperatorInfo
*
pUpstream
);
SOperatorInfo
*
createDataBlocksOptScanInfo
(
void
*
pTsdbQueryHandle
,
STaskRuntimeEnv
*
pRuntimeEnv
,
int32_t
repeatTime
,
int32_t
reverseTime
);
SOperatorInfo
*
createTableScanOperator
(
void
*
pTsdbQueryHandle
,
int32_t
order
,
int32_t
numOfOutput
,
int32_t
repeatTime
);
SOperatorInfo
*
createTableScanOperator
(
void
*
pTsdbQueryHandle
,
int32_t
order
,
int32_t
numOfOutput
,
int32_t
repeatTime
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableSeqScanOperator
(
void
*
pTsdbQueryHandle
,
STaskRuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createAggregateOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
...
...
@@ -572,11 +568,11 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo
*
createJoinOperatorInfo
(
SOperatorInfo
**
pUpstream
,
int32_t
numOfUpstream
,
SSchema
*
pSchema
,
int32_t
numOfOutput
);
SOperatorInfo
*
createOrderOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrder
*
pOrderVal
);
SSDataBlock
*
doGlobalAggregate
(
void
*
param
,
bool
*
newgroup
);
SSDataBlock
*
doMultiwayMergeSort
(
void
*
param
,
bool
*
newgroup
);
SSDataBlock
*
doSLimit
(
void
*
param
,
bool
*
newgroup
);
//
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
//
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
//
SSDataBlock* doSLimit(void* param, bool* newgroup);
int32_t
doCreateFilterInfo
(
SColumnInfo
*
pCols
,
int32_t
numOfCols
,
int32_t
numOfFilterCols
,
SSingleColumnFilterInfo
**
pFilterInfo
,
uint64_t
qId
);
//
int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId);
void
doSetFilterColumnInfo
(
SSingleColumnFilterInfo
*
pFilterInfo
,
int32_t
numOfFilterCols
,
SSDataBlock
*
pBlock
);
bool
doFilterDataBlock
(
SSingleColumnFilterInfo
*
pFilterInfo
,
int32_t
numOfFilterCols
,
int32_t
numOfRows
,
int8_t
*
p
);
void
doCompactSDataBlock
(
SSDataBlock
*
pBlock
,
int32_t
numOfRows
,
int8_t
*
p
);
...
...
@@ -617,14 +613,14 @@ STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t
buildArithmeticExprFromMsg
(
SExprInfo
*
pArithExprInfo
,
void
*
pQueryMsg
);
bool
is
QueryKilled
(
SQInfo
*
pQ
Info
);
bool
is
TaskKilled
(
SExecTaskInfo
*
pTask
Info
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
bool
checkNeedToCompressQueryCol
(
SQInfo
*
pQInfo
);
bool
doBuildResCheck
(
SQInfo
*
pQInfo
);
void
setQueryStatus
(
STaskRuntimeEnv
*
pRuntimeEnv
,
int8_t
status
);
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
);
void
destroyUdfInfo
(
struct
SUdfInfo
*
pUdfInfo
);
//
void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool
isValidQInfo
(
void
*
param
);
...
...
@@ -644,5 +640,7 @@ void freeQueryAttr(STaskAttr *pQuery);
int32_t
getMaximumIdleDurationSec
();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
);
#endif // TDENGINE_EXECUTORIMPL_H
source/libs/executor/src/executil.c
浏览文件 @
9f864b74
...
...
@@ -547,7 +547,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
pTableQueryInfoList
=
malloc
(
POINTER_BYTES
*
size
);
if
(
pTableQueryInfoList
==
NULL
||
posList
==
NULL
||
pGroupResInfo
->
pRows
==
NULL
||
pGroupResInfo
->
pRows
==
NULL
)
{
// qError("QInfo:%"PRIu64" failed alloc memory", GET_
Q
ID(pRuntimeEnv));
// qError("QInfo:%"PRIu64" failed alloc memory", GET_
TASK
ID(pRuntimeEnv));
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_end
;
}
...
...
@@ -619,7 +619,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv
int64_t
endt
=
taosGetTimestampMs
();
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_
Q
ID(pRuntimeEnv),
// qDebug("QInfo:%"PRIx64" result merge completed for group:%d, elapsed time:%" PRId64 " ms", GET_
TASK
ID(pRuntimeEnv),
// pGroupResInfo->currentGroup, endt - startt);
_end:
...
...
@@ -641,13 +641,13 @@ int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRun
break
;
}
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_
Q
ID(pRuntimeEnv), pGroupResInfo->currentGroup);
// qDebug("QInfo:%"PRIu64" no result in group %d, continue", GET_
TASK
ID(pRuntimeEnv), pGroupResInfo->currentGroup);
cleanupGroupResInfo
(
pGroupResInfo
);
incNextGroup
(
pGroupResInfo
);
}
// int64_t elapsedTime = taosGetTimestampUs() - st;
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_
Q
ID(pRuntimeEnv),
// qDebug("QInfo:%"PRIu64" merge res data into group, index:%d, total group:%d, elapsed time:%" PRId64 "us", GET_
TASK
ID(pRuntimeEnv),
// pGroupResInfo->currentGroup, pGroupResInfo->totalGroup, elapsedTime);
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/executor/src/executorimpl.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/executor/test/CMakeLists.txt
0 → 100644
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/executor/test/executorTests.cpp
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/parser/src/parser.c
浏览文件 @
9f864b74
...
...
@@ -70,6 +70,9 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
int32_t
code
=
qParserValidateSqlNode
(
&
pCxt
->
ctx
,
&
info
,
pQueryInfo
,
pCxt
->
pMsg
,
pCxt
->
msgLen
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
*
pQuery
=
(
SQueryNode
*
)
pQueryInfo
;
}
else
{
terrno
=
code
;
return
code
;
}
}
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlan.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/planner/src/planner.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/planner/test/phyPlanTests.cpp
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/qworker/CMakeLists.txt
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/qworker/src/qworker.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
source/libs/scheduler/src/scheduler.c
浏览文件 @
9f864b74
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录