Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
00faaa41
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
00faaa41
编写于
1月 10, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
1月 10, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9692 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
上级
d861c054
1277383d
变更
8
展开全部
显示空白变更内容
内联
并排
Showing
8 changed file
with
1247 addition
and
577 deletion
+1247
-577
include/libs/executor/executor.h
include/libs/executor/executor.h
+18
-18
include/libs/function/function.h
include/libs/function/function.h
+1
-2
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+9
-9
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+107
-76
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+12
-12
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+579
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+520
-459
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+1
-1
未找到文件。
include/libs/executor/executor.h
浏览文件 @
00faaa41
...
@@ -20,16 +20,16 @@
...
@@ -20,16 +20,16 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
typedef
void
*
q
i
nfo_t
;
typedef
void
*
q
TaskI
nfo_t
;
/**
/**
* create the qinfo object according to QueryTableMsg
* create the qinfo object according to QueryTableMsg
* @param tsdb
* @param tsdb
* @param pQueryTableMsg
* @param pQueryTableMsg
* @param
qi
nfo
* @param
pTaskI
nfo
* @return
* @return
*/
*/
int32_t
qCreate
QueryInfo
(
void
*
tsdb
,
int32_t
vgId
,
SQueryTableInfo
*
pQueryTableMsg
,
qinfo_t
*
qi
nfo
,
uint64_t
qId
);
int32_t
qCreate
Task
(
void
*
tsdb
,
int32_t
vgId
,
void
*
pQueryTableMsg
,
qTaskInfo_t
*
pTaskI
nfo
,
uint64_t
qId
);
/**
/**
* the main query execution function, including query on both table and multiple tables,
* the main query execution function, including query on both table and multiple tables,
...
@@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableM
...
@@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableM
* @param qinfo
* @param qinfo
* @return
* @return
*/
*/
bool
q
TableQuery
(
qi
nfo_t
qinfo
,
uint64_t
*
qId
);
bool
q
ExecTask
(
qTaskI
nfo_t
qinfo
,
uint64_t
*
qId
);
/**
/**
* Retrieve the produced results information, if current query is not paused or completed,
* Retrieve the produced results information, if current query is not paused or completed,
...
@@ -48,7 +48,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId);
...
@@ -48,7 +48,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId);
* @param qinfo
* @param qinfo
* @return
* @return
*/
*/
int32_t
qRetrieveQueryResultInfo
(
q
i
nfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
);
int32_t
qRetrieveQueryResultInfo
(
q
TaskI
nfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
);
/**
/**
*
*
...
@@ -60,41 +60,41 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
...
@@ -60,41 +60,41 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
* @param contLen payload length
* @param contLen payload length
* @return
* @return
*/
*/
int32_t
qDumpRetrieveResult
(
q
i
nfo_t
qinfo
,
SRetrieveTableRsp
**
pRsp
,
int32_t
*
contLen
,
bool
*
continueExec
);
int32_t
qDumpRetrieveResult
(
q
TaskI
nfo_t
qinfo
,
SRetrieveTableRsp
**
pRsp
,
int32_t
*
contLen
,
bool
*
continueExec
);
/**
/**
* return the transporter context (RPC)
* return the transporter context (RPC)
* @param qinfo
* @param qinfo
* @return
* @return
*/
*/
void
*
qGetResultRetrieveMsg
(
q
i
nfo_t
qinfo
);
void
*
qGetResultRetrieveMsg
(
q
TaskI
nfo_t
qinfo
);
/**
/**
* kill the ongoing query and free the query handle and corresponding resources automatically
* kill the ongoing query and free the query handle and corresponding resources automatically
* @param qinfo qhandle
* @param qinfo qhandle
* @return
* @return
*/
*/
int32_t
qKill
Query
(
qi
nfo_t
qinfo
);
int32_t
qKill
Task
(
qTaskI
nfo_t
qinfo
);
/**
/**
* return whether query is completed or not
* return whether query is completed or not
* @param qinfo
* @param qinfo
* @return
* @return
*/
*/
int32_t
qIsQueryCompleted
(
q
i
nfo_t
qinfo
);
int32_t
qIsQueryCompleted
(
q
TaskI
nfo_t
qinfo
);
/**
/**
* destroy query info structure
* destroy query info structure
* @param qHandle
* @param qHandle
*/
*/
void
qDestroy
QueryInfo
(
qi
nfo_t
qHandle
);
void
qDestroy
Task
(
qTaskI
nfo_t
qHandle
);
/**
/**
* Get the queried table uid
* Get the queried table uid
* @param qHandle
* @param qHandle
* @return
* @return
*/
*/
int64_t
qGetQueriedTableUid
(
q
i
nfo_t
qHandle
);
int64_t
qGetQueriedTableUid
(
q
TaskI
nfo_t
qHandle
);
/**
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
...
@@ -121,7 +121,7 @@ int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGro
...
@@ -121,7 +121,7 @@ int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGro
* @param type operation type: ADD|DROP
* @param type operation type: ADD|DROP
* @return
* @return
*/
*/
int32_t
qUpdateQueriedTableIdList
(
q
i
nfo_t
qinfo
,
int64_t
uid
,
int32_t
type
);
int32_t
qUpdateQueriedTableIdList
(
q
TaskI
nfo_t
qinfo
,
int64_t
uid
,
int32_t
type
);
//================================================================================================
//================================================================================================
// query handle management
// query handle management
...
@@ -130,13 +130,13 @@ int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type);
...
@@ -130,13 +130,13 @@ int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type);
* @param vgId
* @param vgId
* @return
* @return
*/
*/
void
*
qOpen
Query
Mgmt
(
int32_t
vgId
);
void
*
qOpen
Task
Mgmt
(
int32_t
vgId
);
/**
/**
* broadcast the close information and wait for all query stop.
* broadcast the close information and wait for all query stop.
* @param pExecutor
* @param pExecutor
*/
*/
void
q
QueryMgmtNotifyClosed
(
void
*
pExecutor
);
void
q
TaskMgmtNotifyClosing
(
void
*
pExecutor
);
/**
/**
* Re-open the query handle management module when opening the vnode again.
* Re-open the query handle management module when opening the vnode again.
...
@@ -148,7 +148,7 @@ void qQueryMgmtReOpen(void *pExecutor);
...
@@ -148,7 +148,7 @@ void qQueryMgmtReOpen(void *pExecutor);
* Close query mgmt and clean up resources.
* Close query mgmt and clean up resources.
* @param pExecutor
* @param pExecutor
*/
*/
void
qCleanup
Query
Mgmt
(
void
*
pExecutor
);
void
qCleanup
Task
Mgmt
(
void
*
pExecutor
);
/**
/**
* Add the query into the query mgmt object
* Add the query into the query mgmt object
...
@@ -157,7 +157,7 @@ void qCleanupQueryMgmt(void* pExecutor);
...
@@ -157,7 +157,7 @@ void qCleanupQueryMgmt(void* pExecutor);
* @param qInfo
* @param qInfo
* @return
* @return
*/
*/
void
**
qRegister
QInfo
(
void
*
pMgmt
,
uint64_t
qId
,
void
*
qInfo
);
void
**
qRegister
Task
(
void
*
pMgmt
,
uint64_t
qId
,
void
*
qInfo
);
/**
/**
* acquire the query handle according to the key from query mgmt object.
* acquire the query handle according to the key from query mgmt object.
...
@@ -165,7 +165,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
...
@@ -165,7 +165,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
* @param key
* @param key
* @return
* @return
*/
*/
void
**
qAcquire
QInfo
(
void
*
pMgmt
,
uint64_t
key
);
void
**
qAcquire
Task
(
void
*
pMgmt
,
uint64_t
key
);
/**
/**
* release the query handle and decrease the reference count in cache
* release the query handle and decrease the reference count in cache
...
@@ -174,7 +174,7 @@ void** qAcquireQInfo(void* pMgmt, uint64_t key);
...
@@ -174,7 +174,7 @@ void** qAcquireQInfo(void* pMgmt, uint64_t key);
* @param freeHandle
* @param freeHandle
* @return
* @return
*/
*/
void
**
qRelease
QInfo
(
void
*
pMgmt
,
void
*
pQInfo
);
void
**
qRelease
Task
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
freeHandle
);
/**
/**
* De-register the query handle from the management module and free it immediately.
* De-register the query handle from the management module and free it immediately.
...
...
include/libs/function/function.h
浏览文件 @
00faaa41
...
@@ -89,7 +89,7 @@ enum {
...
@@ -89,7 +89,7 @@ enum {
};
};
enum
{
enum
{
MA
STER_SCAN
=
0x0u
,
MA
IN_SCAN
=
0x0u
,
REVERSE_SCAN
=
0x1u
,
REVERSE_SCAN
=
0x1u
,
REPEAT_SCAN
=
0x2u
,
//repeat scan belongs to the master scan
REPEAT_SCAN
=
0x2u
,
//repeat scan belongs to the master scan
MERGE_STAGE
=
0x20u
,
MERGE_STAGE
=
0x20u
,
...
@@ -183,7 +183,6 @@ typedef struct tExprNode {
...
@@ -183,7 +183,6 @@ typedef struct tExprNode {
struct
{
// function node
struct
{
// function node
char
functionName
[
FUNCTIONS_NAME_MAX_LENGTH
];
char
functionName
[
FUNCTIONS_NAME_MAX_LENGTH
];
// int32_t functionId;
int32_t
num
;
int32_t
num
;
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
...
...
source/libs/executor/inc/executil.h
浏览文件 @
00faaa41
...
@@ -88,37 +88,37 @@ typedef struct SResultRowPool {
...
@@ -88,37 +88,37 @@ typedef struct SResultRowPool {
SArray
*
pData
;
// SArray<void*>
SArray
*
pData
;
// SArray<void*>
}
SResultRowPool
;
}
SResultRowPool
;
struct
S
Query
Attr
;
struct
S
Task
Attr
;
struct
S
Query
RuntimeEnv
;
struct
S
Task
RuntimeEnv
;
struct
SUdfInfo
;
struct
SUdfInfo
;
int32_t
getOutputInterResultBufSize
(
struct
S
Query
Attr
*
pQueryAttr
);
int32_t
getOutputInterResultBufSize
(
struct
S
Task
Attr
*
pQueryAttr
);
size_t
getResultRowSize
(
struct
S
Query
RuntimeEnv
*
pRuntimeEnv
);
size_t
getResultRowSize
(
struct
S
Task
RuntimeEnv
*
pRuntimeEnv
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
);
int32_t
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
size
,
int16_t
type
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
struct
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
void
resetResultRowInfo
(
struct
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
int32_t
numOfClosedResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
int32_t
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
closeResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
bool
isResultRowClosed
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
);
void
clearResultRow
(
struct
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
void
clearResultRow
(
struct
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
struct
SResultRowEntryInfo
*
getResultCell
(
const
SResultRow
*
pRow
,
int32_t
index
,
int32_t
*
offset
);
void
*
destroyQueryFuncExpr
(
SExprInfo
*
pExprInfo
,
int32_t
numOfExpr
);
void
*
destroyQueryFuncExpr
(
SExprInfo
*
pExprInfo
,
int32_t
numOfExpr
);
void
*
freeColumnInfo
(
SColumnInfo
*
pColumnInfo
,
int32_t
numOfCols
);
void
*
freeColumnInfo
(
SColumnInfo
*
pColumnInfo
,
int32_t
numOfCols
);
int32_t
getRowNumForMultioutput
(
struct
S
Query
Attr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
);
int32_t
getRowNumForMultioutput
(
struct
S
Task
Attr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
);
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
static
FORCE_INLINE
SResultRow
*
getResultRow
(
SResultRowInfo
*
pResultRowInfo
,
int32_t
slot
)
{
assert
(
pResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRowInfo
->
size
);
assert
(
pResultRowInfo
!=
NULL
&&
slot
>=
0
&&
slot
<
pResultRowInfo
->
size
);
return
pResultRowInfo
->
pResult
[
slot
];
return
pResultRowInfo
->
pResult
[
slot
];
}
}
static
FORCE_INLINE
char
*
getPosInResultPage
(
struct
S
Query
Attr
*
pQueryAttr
,
SFilePage
*
page
,
int32_t
rowOffset
,
static
FORCE_INLINE
char
*
getPosInResultPage
(
struct
S
Task
Attr
*
pQueryAttr
,
SFilePage
*
page
,
int32_t
rowOffset
,
int32_t
offset
)
{
int32_t
offset
)
{
assert
(
rowOffset
>=
0
&&
pQueryAttr
!=
NULL
);
assert
(
rowOffset
>=
0
&&
pQueryAttr
!=
NULL
);
...
@@ -155,7 +155,7 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo);
...
@@ -155,7 +155,7 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
);
bool
incNextGroup
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
struct
S
Query
RuntimeEnv
*
pRuntimeEnv
,
int32_t
*
offset
);
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
struct
S
Task
RuntimeEnv
*
pRuntimeEnv
,
int32_t
*
offset
);
int32_t
initUdfInfo
(
struct
SUdfInfo
*
pUdfInfo
);
int32_t
initUdfInfo
(
struct
SUdfInfo
*
pUdfInfo
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
00faaa41
...
@@ -21,13 +21,14 @@
...
@@ -21,13 +21,14 @@
#include "tvariant.h"
#include "tvariant.h"
#include "thash.h"
#include "thash.h"
//#include "parser.h"
#include "executil.h"
#include "executil.h"
#include "taosdef.h"
#include "taosdef.h"
#include "tarray.h"
#include "tarray.h"
#include "tfilter.h"
#include "tfilter.h"
#include "tlockfree.h"
#include "tlockfree.h"
#include "tpagedfile.h"
#include "tpagedfile.h"
#include "planner.h"
struct
SColumnFilterElem
;
struct
SColumnFilterElem
;
...
@@ -65,7 +66,6 @@ enum {
...
@@ -65,7 +66,6 @@ enum {
QUERY_OVER
=
0x4u
,
QUERY_OVER
=
0x4u
,
};
};
typedef
struct
SResultRowCell
{
typedef
struct
SResultRowCell
{
uint64_t
groupId
;
uint64_t
groupId
;
SResultRow
*
pRow
;
SResultRow
*
pRow
;
...
@@ -100,7 +100,7 @@ typedef struct STableQueryInfo {
...
@@ -100,7 +100,7 @@ typedef struct STableQueryInfo {
TSKEY
lastKey
;
TSKEY
lastKey
;
int32_t
groupIndex
;
// group id in table list
int32_t
groupIndex
;
// group id in table list
SVariant
tag
;
SVariant
tag
;
STimeWindow
win
;
STimeWindow
win
;
// todo remove it later
STSCursor
cur
;
STSCursor
cur
;
void
*
pTable
;
// for retrieve the page id list
void
*
pTable
;
// for retrieve the page id list
SResultRowInfo
resInfo
;
SResultRowInfo
resInfo
;
...
@@ -128,7 +128,10 @@ typedef struct {
...
@@ -128,7 +128,10 @@ typedef struct {
int64_t
sumRunTimes
;
int64_t
sumRunTimes
;
}
SOperatorProfResult
;
}
SOperatorProfResult
;
typedef
struct
SQueryCostInfo
{
typedef
struct
STaskCostInfo
{
int64_t
start
;
int64_t
end
;
uint64_t
loadStatisTime
;
uint64_t
loadStatisTime
;
uint64_t
loadFileBlockTime
;
uint64_t
loadFileBlockTime
;
uint64_t
loadDataInCacheTime
;
uint64_t
loadDataInCacheTime
;
...
@@ -150,9 +153,9 @@ typedef struct SQueryCostInfo {
...
@@ -150,9 +153,9 @@ typedef struct SQueryCostInfo {
uint64_t
hashSize
;
uint64_t
hashSize
;
uint64_t
numOfTimeWindows
;
uint64_t
numOfTimeWindows
;
SArray
*
queryProfEvents
;
//SArray<SQueryProfEvent>
SArray
*
queryProfEvents
;
//SArray<SQueryProfEvent>
SHashObj
*
operatorProfResults
;
//map<operator_type, SQueryProfEvent>
SHashObj
*
operatorProfResults
;
//map<operator_type, SQueryProfEvent>
}
S
Query
CostInfo
;
}
S
Task
CostInfo
;
typedef
struct
{
typedef
struct
{
int64_t
vgroupLimit
;
int64_t
vgroupLimit
;
...
@@ -166,7 +169,7 @@ typedef struct {
...
@@ -166,7 +169,7 @@ typedef struct {
// The basic query information extracted from the SQueryInfo tree to support the
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
// execution of query in a data node.
typedef
struct
S
Query
Attr
{
typedef
struct
S
Task
Attr
{
SLimit
limit
;
SLimit
limit
;
SLimit
slimit
;
SLimit
slimit
;
...
@@ -229,16 +232,40 @@ typedef struct SQueryAttr {
...
@@ -229,16 +232,40 @@ typedef struct SQueryAttr {
STableGroupInfo
tableGroupInfo
;
// table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo
tableGroupInfo
;
// table <tid, last_key> list SArray<STableKeyInfo>
int32_t
vgId
;
int32_t
vgId
;
SArray
*
pUdfInfo
;
// no need to free
SArray
*
pUdfInfo
;
// no need to free
}
S
Query
Attr
;
}
S
Task
Attr
;
typedef
SSDataBlock
*
(
*
__operator_fn_t
)(
void
*
param
,
bool
*
newgroup
);
typedef
SSDataBlock
*
(
*
__operator_fn_t
)(
void
*
param
,
bool
*
newgroup
);
typedef
void
(
*
__optr_cleanup_fn_t
)(
void
*
param
,
int32_t
num
);
typedef
void
(
*
__optr_cleanup_fn_t
)(
void
*
param
,
int32_t
num
);
struct
SOperatorInfo
;
struct
SOperatorInfo
;
typedef
struct
SQueryRuntimeEnv
{
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
uint64_t
subplanId
;
uint64_t
templateId
;
uint64_t
taskId
;
// this is a subplan id
}
STaskIdInfo
;
typedef
struct
STaskInfo
{
STaskIdInfo
id
;
char
*
content
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
// tsem_t ready;
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char
*
sql
;
// query sql string
jmp_buf
env
;
}
STaskInfo
;
typedef
struct
STaskRuntimeEnv
{
jmp_buf
env
;
jmp_buf
env
;
S
QueryAttr
*
pQueryAttr
;
S
TaskAttr
*
pQueryAttr
;
uint32_t
status
;
// query status
uint32_t
status
;
// query status
void
*
qinfo
;
void
*
qinfo
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
uint8_t
scanFlag
;
// denotes reversed scan of data or not
...
@@ -271,7 +298,7 @@ typedef struct SQueryRuntimeEnv {
...
@@ -271,7 +298,7 @@ typedef struct SQueryRuntimeEnv {
SRspResultInfo
resultInfo
;
SRspResultInfo
resultInfo
;
SHashObj
*
pTableRetrieveTsMap
;
SHashObj
*
pTableRetrieveTsMap
;
struct
SUdfInfo
*
pUdfInfo
;
struct
SUdfInfo
*
pUdfInfo
;
}
S
Query
RuntimeEnv
;
}
S
Task
RuntimeEnv
;
enum
{
enum
{
OP_IN_EXECUTING
=
1
,
OP_IN_EXECUTING
=
1
,
...
@@ -287,10 +314,11 @@ typedef struct SOperatorInfo {
...
@@ -287,10 +314,11 @@ typedef struct SOperatorInfo {
char
*
name
;
// name, used to show the query execution plan
char
*
name
;
// name, used to show the query execution plan
void
*
info
;
// extension attribution
void
*
info
;
// extension attribution
SExprInfo
*
pExpr
;
SExprInfo
*
pExpr
;
SQueryRuntimeEnv
*
pRuntimeEnv
;
STaskRuntimeEnv
*
pRuntimeEnv
;
STaskInfo
*
pTaskInfo
;
struct
SOperatorInfo
**
upstream
;
// upstre
am pointer list
struct
SOperatorInfo
**
pDownstream
;
// downstr
am pointer list
int32_t
numOf
Upstream
;
// number of up
stream. The value is always ONE expect for join operator
int32_t
numOf
Downstream
;
// number of down
stream. The value is always ONE expect for join operator
__operator_fn_t
exec
;
__operator_fn_t
exec
;
__optr_cleanup_fn_t
cleanup
;
__optr_cleanup_fn_t
cleanup
;
}
SOperatorInfo
;
}
SOperatorInfo
;
...
@@ -312,8 +340,8 @@ typedef struct SQInfo {
...
@@ -312,8 +340,8 @@ typedef struct SQInfo {
int32_t
code
;
// error code to returned to client
int32_t
code
;
// error code to returned to client
int64_t
owner
;
// if it is in execution
int64_t
owner
;
// if it is in execution
S
Query
RuntimeEnv
runtimeEnv
;
S
Task
RuntimeEnv
runtimeEnv
;
S
Query
Attr
query
;
S
Task
Attr
query
;
void
*
pBuf
;
// allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
void
*
pBuf
;
// allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
...
@@ -322,10 +350,10 @@ typedef struct SQInfo {
...
@@ -322,10 +350,10 @@ typedef struct SQInfo {
void
*
rspContext
;
// response context
void
*
rspContext
;
// response context
int64_t
startExecTs
;
// start to exec timestamp
int64_t
startExecTs
;
// start to exec timestamp
char
*
sql
;
// query sql string
char
*
sql
;
// query sql string
S
QueryCostInfo
summary
;
S
TaskCostInfo
summary
;
}
SQInfo
;
}
SQInfo
;
typedef
struct
S
Query
Param
{
typedef
struct
S
Task
Param
{
char
*
sql
;
char
*
sql
;
char
*
tagCond
;
char
*
tagCond
;
char
*
colCond
;
char
*
colCond
;
...
@@ -345,7 +373,7 @@ typedef struct SQueryParam {
...
@@ -345,7 +373,7 @@ typedef struct SQueryParam {
int32_t
tableScanOperator
;
int32_t
tableScanOperator
;
SArray
*
pOperator
;
SArray
*
pOperator
;
struct
SUdfInfo
*
pUdfInfo
;
struct
SUdfInfo
*
pUdfInfo
;
}
S
Query
Param
;
}
S
Task
Param
;
typedef
struct
STableScanInfo
{
typedef
struct
STableScanInfo
{
void
*
pQueryHandle
;
void
*
pQueryHandle
;
...
@@ -366,9 +394,12 @@ typedef struct STableScanInfo {
...
@@ -366,9 +394,12 @@ typedef struct STableScanInfo {
SSDataBlock
block
;
SSDataBlock
block
;
int32_t
numOfOutput
;
int32_t
numOfOutput
;
int64_t
elapsedTime
;
int64_t
elapsedTime
;
int32_t
tableIndex
;
int32_t
tableIndex
;
int32_t
prevGroupId
;
// previous table group id
int32_t
prevGroupId
;
// previous table group id
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
STimeWindow
window
;
}
STableScanInfo
;
}
STableScanInfo
;
typedef
struct
STagScanInfo
{
typedef
struct
STagScanInfo
{
...
@@ -512,34 +543,34 @@ typedef struct SOrderOperatorInfo {
...
@@ -512,34 +543,34 @@ typedef struct SOrderOperatorInfo {
void
appendUpstream
(
SOperatorInfo
*
p
,
SOperatorInfo
*
pUpstream
);
void
appendUpstream
(
SOperatorInfo
*
p
,
SOperatorInfo
*
pUpstream
);
SOperatorInfo
*
createDataBlocksOptScanInfo
(
void
*
pTsdbQueryHandle
,
S
Query
RuntimeEnv
*
pRuntimeEnv
,
int32_t
repeatTime
,
int32_t
reverseTime
);
SOperatorInfo
*
createDataBlocksOptScanInfo
(
void
*
pTsdbQueryHandle
,
S
Task
RuntimeEnv
*
pRuntimeEnv
,
int32_t
repeatTime
,
int32_t
reverseTime
);
SOperatorInfo
*
createTableScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
repeatTime
);
SOperatorInfo
*
createTableScanOperator
(
void
*
pTsdbQueryHandle
,
int32_t
order
,
int32_t
numOfOutput
,
int32_t
repeatTime
);
SOperatorInfo
*
createTableSeqScanOperator
(
void
*
pTsdbQueryHandle
,
S
Query
RuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createTableSeqScanOperator
(
void
*
pTsdbQueryHandle
,
S
Task
RuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createAggregateOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAggregateOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createProjectOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createProjectOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createLimitOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
);
SOperatorInfo
*
createLimitOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
);
SOperatorInfo
*
createTimeIntervalOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTimeIntervalOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSWindowOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSWindowOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createFillOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
bool
multigroupResult
);
SOperatorInfo
*
createFillOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
bool
multigroupResult
);
SOperatorInfo
*
createGroupbyOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createGroupbyOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableTimeIntervalOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createMultiTableTimeIntervalOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createAllMultiTableTimeIntervalOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTagScanOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTagScanOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createDistinctOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createDistinctOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
S
Query
RuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
S
Task
RuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createMultiwaySortOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOperatorInfo
*
createMultiwaySortOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
numOfRows
,
void
*
merger
);
int32_t
numOfRows
,
void
*
merger
);
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
,
bool
groupResultMixedUp
);
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
,
bool
groupResultMixedUp
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSLimitOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
merger
,
bool
multigroupResult
);
SOperatorInfo
*
createSLimitOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
merger
,
bool
multigroupResult
);
SOperatorInfo
*
createFilterOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
SOperatorInfo
*
createFilterOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SColumnInfo
*
pCols
,
int32_t
numOfFilter
);
int32_t
numOfOutput
,
SColumnInfo
*
pCols
,
int32_t
numOfFilter
);
SOperatorInfo
*
createJoinOperatorInfo
(
SOperatorInfo
**
pUpstream
,
int32_t
numOfUpstream
,
SSchema
*
pSchema
,
int32_t
numOfOutput
);
SOperatorInfo
*
createJoinOperatorInfo
(
SOperatorInfo
**
pUpstream
,
int32_t
numOfUpstream
,
SSchema
*
pSchema
,
int32_t
numOfOutput
);
SOperatorInfo
*
createOrderOperatorInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrder
*
pOrderVal
);
SOperatorInfo
*
createOrderOperatorInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SOrder
*
pOrderVal
);
SSDataBlock
*
doGlobalAggregate
(
void
*
param
,
bool
*
newgroup
);
SSDataBlock
*
doGlobalAggregate
(
void
*
param
,
bool
*
newgroup
);
SSDataBlock
*
doMultiwayMergeSort
(
void
*
param
,
bool
*
newgroup
);
SSDataBlock
*
doMultiwayMergeSort
(
void
*
param
,
bool
*
newgroup
);
...
@@ -561,8 +592,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
...
@@ -561,8 +592,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
void
clearOutputBuf
(
SOptrBasicInfo
*
pBInfo
,
int32_t
*
bufCapacity
);
void
clearOutputBuf
(
SOptrBasicInfo
*
pBInfo
,
int32_t
*
bufCapacity
);
void
copyTsColoum
(
SSDataBlock
*
pRes
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
copyTsColoum
(
SSDataBlock
*
pRes
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
freeParam
(
S
Query
Param
*
param
);
void
freeParam
(
S
Task
Param
*
param
);
int32_t
convertQueryMsg
(
SQueryTableMsg
*
pQueryMsg
,
S
Query
Param
*
param
);
int32_t
convertQueryMsg
(
SQueryTableMsg
*
pQueryMsg
,
S
Task
Param
*
param
);
int32_t
createQueryFunc
(
SQueriedTableInfo
*
pTableInfo
,
int32_t
numOfOutput
,
SExprInfo
**
pExprInfo
,
int32_t
createQueryFunc
(
SQueriedTableInfo
*
pTableInfo
,
int32_t
numOfOutput
,
SExprInfo
**
pExprInfo
,
SSqlExpr
**
pExprMsg
,
SColumnInfo
*
pTagCols
,
int32_t
queryType
,
void
*
pMsg
,
struct
SUdfInfo
*
pUdfInfo
);
SSqlExpr
**
pExprMsg
,
SColumnInfo
*
pTagCols
,
int32_t
queryType
,
void
*
pMsg
,
struct
SUdfInfo
*
pUdfInfo
);
...
@@ -575,13 +606,13 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo
...
@@ -575,13 +606,13 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SQInfo
*
createQInfoImpl
(
SQueryTableMsg
*
pQueryMsg
,
SGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
SFilterInfo
*
pFilters
,
int32_t
vgId
,
char
*
sql
,
uint64_t
qId
,
struct
SUdfInfo
*
pUdfInfo
);
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
SFilterInfo
*
pFilters
,
int32_t
vgId
,
char
*
sql
,
uint64_t
qId
,
struct
SUdfInfo
*
pUdfInfo
);
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
void
*
sourceOptr
,
SQInfo
*
pQInfo
,
S
Query
Param
*
param
,
char
*
start
,
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
void
*
sourceOptr
,
SQInfo
*
pQInfo
,
S
Task
Param
*
param
,
char
*
start
,
int32_t
prevResultLen
,
void
*
merger
);
int32_t
prevResultLen
,
void
*
merger
);
int32_t
createFilterInfo
(
S
Query
Attr
*
pQueryAttr
,
uint64_t
qId
);
int32_t
createFilterInfo
(
S
Task
Attr
*
pQueryAttr
,
uint64_t
qId
);
void
freeColumnFilterInfo
(
SColumnFilterInfo
*
pFilter
,
int32_t
numOfFilters
);
void
freeColumnFilterInfo
(
SColumnFilterInfo
*
pFilter
,
int32_t
numOfFilters
);
STableQueryInfo
*
createTableQueryInfo
(
S
Query
Attr
*
pQueryAttr
,
void
*
pTable
,
bool
groupbyColumn
,
STimeWindow
win
,
void
*
buf
);
STableQueryInfo
*
createTableQueryInfo
(
S
Task
Attr
*
pQueryAttr
,
void
*
pTable
,
bool
groupbyColumn
,
STimeWindow
win
,
void
*
buf
);
STableQueryInfo
*
createTmpTableQueryInfo
(
STimeWindow
win
);
STableQueryInfo
*
createTmpTableQueryInfo
(
STimeWindow
win
);
int32_t
buildArithmeticExprFromMsg
(
SExprInfo
*
pArithExprInfo
,
void
*
pQueryMsg
);
int32_t
buildArithmeticExprFromMsg
(
SExprInfo
*
pArithExprInfo
,
void
*
pQueryMsg
);
...
@@ -590,9 +621,9 @@ bool isQueryKilled(SQInfo *pQInfo);
...
@@ -590,9 +621,9 @@ bool isQueryKilled(SQInfo *pQInfo);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
bool
checkNeedToCompressQueryCol
(
SQInfo
*
pQInfo
);
bool
checkNeedToCompressQueryCol
(
SQInfo
*
pQInfo
);
bool
doBuildResCheck
(
SQInfo
*
pQInfo
);
bool
doBuildResCheck
(
SQInfo
*
pQInfo
);
void
setQueryStatus
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
int8_t
status
);
void
setQueryStatus
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
int8_t
status
);
bool
onlyQueryTags
(
S
Query
Attr
*
pQueryAttr
);
bool
onlyQueryTags
(
S
Task
Attr
*
pQueryAttr
);
void
destroyUdfInfo
(
struct
SUdfInfo
*
pUdfInfo
);
void
destroyUdfInfo
(
struct
SUdfInfo
*
pUdfInfo
);
bool
isValidQInfo
(
void
*
param
);
bool
isValidQInfo
(
void
*
param
);
...
@@ -607,8 +638,8 @@ void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
...
@@ -607,8 +638,8 @@ void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void
calculateOperatorProfResults
(
SQInfo
*
pQInfo
);
void
calculateOperatorProfResults
(
SQInfo
*
pQInfo
);
void
queryCostStatis
(
SQInfo
*
pQInfo
);
void
queryCostStatis
(
SQInfo
*
pQInfo
);
void
freeQInfo
(
SQInfo
*
pQInfo
);
void
doDestroyTask
(
SQInfo
*
pQInfo
);
void
freeQueryAttr
(
S
Query
Attr
*
pQuery
);
void
freeQueryAttr
(
S
Task
Attr
*
pQuery
);
int32_t
getMaximumIdleDurationSec
();
int32_t
getMaximumIdleDurationSec
();
...
...
source/libs/executor/src/executil.c
浏览文件 @
00faaa41
...
@@ -30,7 +30,7 @@ typedef struct SCompSupporter {
...
@@ -30,7 +30,7 @@ typedef struct SCompSupporter {
int32_t
order
;
int32_t
order
;
}
SCompSupporter
;
}
SCompSupporter
;
int32_t
getRowNumForMultioutput
(
S
Query
Attr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
)
{
int32_t
getRowNumForMultioutput
(
S
Task
Attr
*
pQueryAttr
,
bool
topBottomQuery
,
bool
stable
)
{
if
(
pQueryAttr
&&
(
!
stable
))
{
if
(
pQueryAttr
&&
(
!
stable
))
{
for
(
int16_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
for
(
int16_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) {
// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) {
...
@@ -42,7 +42,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
...
@@ -42,7 +42,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
return
1
;
return
1
;
}
}
int32_t
getOutputInterResultBufSize
(
S
Query
Attr
*
pQueryAttr
)
{
int32_t
getOutputInterResultBufSize
(
S
Task
Attr
*
pQueryAttr
)
{
int32_t
size
=
0
;
int32_t
size
=
0
;
for
(
int32_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
...
@@ -86,7 +86,7 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
...
@@ -86,7 +86,7 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
tfree
(
pResultRowInfo
->
pResult
);
tfree
(
pResultRowInfo
->
pResult
);
}
}
void
resetResultRowInfo
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
void
resetResultRowInfo
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
)
{
if
(
pResultRowInfo
==
NULL
||
pResultRowInfo
->
capacity
==
0
)
{
return
;
return
;
}
}
...
@@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
...
@@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
=
true
;
getResultRow
(
pResultRowInfo
,
slot
)
->
closed
=
true
;
}
}
void
clearResultRow
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
)
{
void
clearResultRow
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int16_t
type
)
{
if
(
pResultRow
==
NULL
)
{
if
(
pResultRow
==
NULL
)
{
return
;
return
;
}
}
...
@@ -174,8 +174,8 @@ struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index,
...
@@ -174,8 +174,8 @@ struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index,
return
NULL
;
return
NULL
;
}
}
size_t
getResultRowSize
(
S
Query
RuntimeEnv
*
pRuntimeEnv
)
{
size_t
getResultRowSize
(
S
Task
RuntimeEnv
*
pRuntimeEnv
)
{
S
Query
Attr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
S
Task
Attr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
return
0
;
return
0
;
// return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
// return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
}
}
...
@@ -393,8 +393,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
...
@@ -393,8 +393,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return
(
int32_t
)
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
return
(
int32_t
)
taosArrayGetSize
(
pGroupResInfo
->
pRows
);
}
}
static
int64_t
getNumOfResultWindowRes
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int32_t
*
rowCellInfoOffset
)
{
static
int64_t
getNumOfResultWindowRes
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SResultRow
*
pResultRow
,
int32_t
*
rowCellInfoOffset
)
{
S
Query
Attr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
S
Task
Attr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
for
(
int32_t
j
=
0
;
j
<
pQueryAttr
->
numOfOutput
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQueryAttr
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
0
;
//pQueryAttr->pExpr1[j].base.functionId;
int32_t
functionId
=
0
;
//pQueryAttr->pExpr1[j].base.functionId;
...
@@ -488,7 +488,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
...
@@ -488,7 +488,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
}
}
}
}
void
orderTheResultRows
(
S
Query
RuntimeEnv
*
pRuntimeEnv
)
{
void
orderTheResultRows
(
S
Task
RuntimeEnv
*
pRuntimeEnv
)
{
__compar_fn_t
fn
=
NULL
;
__compar_fn_t
fn
=
NULL
;
if
(
pRuntimeEnv
->
pQueryAttr
->
order
.
order
==
TSDB_ORDER_ASC
)
{
if
(
pRuntimeEnv
->
pQueryAttr
->
order
.
order
==
TSDB_ORDER_ASC
)
{
fn
=
tsAscOrder
;
fn
=
tsAscOrder
;
...
@@ -499,7 +499,7 @@ void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
...
@@ -499,7 +499,7 @@ void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
taosArraySort
(
pRuntimeEnv
->
pResultRowArrayList
,
fn
);
taosArraySort
(
pRuntimeEnv
->
pResultRowArrayList
,
fn
);
}
}
static
int32_t
mergeIntoGroupResultImplRv
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
uint64_t
groupId
,
int32_t
*
rowCellInfoOffset
)
{
static
int32_t
mergeIntoGroupResultImplRv
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
uint64_t
groupId
,
int32_t
*
rowCellInfoOffset
)
{
if
(
!
pGroupResInfo
->
ordered
)
{
if
(
!
pGroupResInfo
->
ordered
)
{
orderTheResultRows
(
pRuntimeEnv
);
orderTheResultRows
(
pRuntimeEnv
);
pGroupResInfo
->
ordered
=
true
;
pGroupResInfo
->
ordered
=
true
;
...
@@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR
...
@@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
UNUSED_FUNC
int32_t
mergeIntoGroupResultImpl
(
S
Query
RuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pTableList
,
static
UNUSED_FUNC
int32_t
mergeIntoGroupResultImpl
(
S
Task
RuntimeEnv
*
pRuntimeEnv
,
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pTableList
,
int32_t
*
rowCellInfoOffset
)
{
int32_t
*
rowCellInfoOffset
)
{
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pRuntimeEnv
->
pQueryAttr
);
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pRuntimeEnv
->
pQueryAttr
);
...
@@ -630,7 +630,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn
...
@@ -630,7 +630,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn
return
code
;
return
code
;
}
}
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
S
Query
RuntimeEnv
*
pRuntimeEnv
,
int32_t
*
offset
)
{
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
S
Task
RuntimeEnv
*
pRuntimeEnv
,
int32_t
*
offset
)
{
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
while
(
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
)
{
while
(
pGroupResInfo
->
currentGroup
<
pGroupResInfo
->
totalGroup
)
{
...
...
source/libs/executor/src/executorMain.c
0 → 100644
浏览文件 @
00faaa41
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
#include "exception.h"
#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"
typedef
struct
STaskMgmt
{
pthread_mutex_t
lock
;
SCacheObj
*
qinfoPool
;
// query handle pool
int32_t
vgId
;
bool
closed
;
}
STaskMgmt
;
static
void
taskMgmtKillTaskFn
(
void
*
handle
,
void
*
param1
)
{
void
**
fp
=
(
void
**
)
handle
;
qKillTask
(
*
fp
);
}
static
void
freeqinfoFn
(
void
*
qhandle
)
{
void
**
handle
=
qhandle
;
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
return
;
}
qKillTask
(
*
handle
);
qDestroyTask
(
*
handle
);
}
void
freeParam
(
STaskParam
*
param
)
{
tfree
(
param
->
sql
);
tfree
(
param
->
tagCond
);
tfree
(
param
->
tbnameCond
);
tfree
(
param
->
pTableIdList
);
taosArrayDestroy
(
param
->
pOperator
);
tfree
(
param
->
pExprs
);
tfree
(
param
->
pSecExprs
);
tfree
(
param
->
pExpr
);
tfree
(
param
->
pSecExpr
);
tfree
(
param
->
pGroupColIndex
);
tfree
(
param
->
pTagColumnInfo
);
tfree
(
param
->
pGroupbyExpr
);
tfree
(
param
->
prevResult
);
}
// todo parse json to get the operator tree.
int32_t
qCreateTask
(
void
*
tsdb
,
int32_t
vgId
,
void
*
pQueryMsg
,
qTaskInfo_t
*
pTaskInfo
,
uint64_t
taskId
)
{
assert
(
pQueryMsg
!=
NULL
&&
tsdb
!=
NULL
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
#if 0
STaskParam param = {0};
code = convertQueryMsg(pQueryMsg, ¶m);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
if (pQueryMsg->numOfTables <= 0) {
qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) {
qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols};
if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, ¶m.pExprs, param.pExpr, param.pTagColumnInfo,
pQueryMsg->queryType, pQueryMsg, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
if (param.pSecExpr != NULL) {
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, ¶m.pSecExprs, param.pSecExpr, param.pExprs, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
if (param.colCond != NULL) {
if ((code = createQueryFilter(param.colCond, pQueryMsg->colCondLen, ¶m.pFilters)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
param.pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, param.pGroupColIndex, &code);
if ((param.pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _over;
}
bool isSTableQuery = false;
STableGroupInfo tableGroupInfo = {0};
int64_t st = taosGetTimestampUs();
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, pQueryMsg->window.skey, &tableGroupInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true;
// also note there's possibility that only one table in the super table
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(param.pGroupColIndex->flag)) {
numOfGroupByCols = 0;
}
qDebug("qmsg:%p query stable, uid:%"PRIu64", tid:%d", pQueryMsg, id->uid, id->tid);
code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, param.tagCond, pQueryMsg->tagCondLen,
pQueryMsg->tagNameRelType, param.tbnameCond, &tableGroupInfo, param.pGroupColIndex, numOfGroupByCols);
if (code != TSDB_CODE_SUCCESS) {
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
goto _over;
}
} else {
code = tsdbGetTableGroupFromIdList(tsdb, param.pTableIdList, &tableGroupInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
qDebug("qmsg:%p query on %u tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
}
int64_t el = taosGetTimestampUs() - st;
qDebug("qmsg:%p tag filter completed, numOfTables:%u, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
} else {
assert(0);
}
code = checkForQueryBuf(tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
goto _over;
}
assert(pQueryMsg->stableQuery == isSTableQuery);
(*pTaskInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo,
param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo);
param.sql = NULL;
param.pExprs = NULL;
param.pSecExprs = NULL;
param.pGroupbyExpr = NULL;
param.pTagColumnInfo = NULL;
param.pFilters = NULL;
if ((*pTaskInfo) == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _over;
}
param.pUdfInfo = NULL;
code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pTaskInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
_over:
if (param.pGroupbyExpr != NULL) {
taosArrayDestroy(param.pGroupbyExpr->columnInfo);
}
tfree(param.colCond);
destroyUdfInfo(param.pUdfInfo);
taosArrayDestroy(param.pTableIdList);
param.pTableIdList = NULL;
freeParam(¶m);
for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) {
SColumnInfo* column = pQueryMsg->tableCols + i;
freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters);
}
filterFreeInfo(param.pFilters);
//pTaskInfo already freed in initQInfo, but *pTaskInfo may not pointer to null;
if (code != TSDB_CODE_SUCCESS) {
*pTaskInfo = NULL;
}
#endif
// if failed to add ref for all tables in this query, abort current query
return
code
;
}
#ifdef TEST_IMPL
// wait moment
int
waitMoment
(
SQInfo
*
pQInfo
){
if
(
pQInfo
->
sql
)
{
int
ms
=
0
;
char
*
pcnt
=
strstr
(
pQInfo
->
sql
,
" count(*)"
);
if
(
pcnt
)
return
0
;
char
*
pos
=
strstr
(
pQInfo
->
sql
,
" t_"
);
if
(
pos
){
pos
+=
3
;
ms
=
atoi
(
pos
);
while
(
*
pos
>=
'0'
&&
*
pos
<=
'9'
){
pos
++
;
}
char
unit_char
=
*
pos
;
if
(
unit_char
==
'h'
){
ms
*=
3600
*
1000
;
}
else
if
(
unit_char
==
'm'
){
ms
*=
60
*
1000
;
}
else
if
(
unit_char
==
's'
){
ms
*=
1000
;
}
}
if
(
ms
==
0
)
return
0
;
printf
(
"test wait sleep %dms. sql=%s ...
\n
"
,
ms
,
pQInfo
->
sql
);
if
(
ms
<
1000
)
{
taosMsleep
(
ms
);
}
else
{
int
used_ms
=
0
;
while
(
used_ms
<
ms
)
{
taosMsleep
(
1000
);
used_ms
+=
1000
;
if
(
isQueryKilled
(
pQInfo
)){
printf
(
"test check query is canceled, sleep break.%s
\n
"
,
pQInfo
->
sql
);
break
;
}
}
}
}
return
1
;
}
#endif
bool
qExecTask
(
qTaskInfo_t
qinfo
,
uint64_t
*
qId
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
&&
pQInfo
->
signature
==
pQInfo
);
int64_t
threadId
=
taosGetSelfPthreadId
();
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pQInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"QInfo:0x%"
PRIx64
"-%p qhandle is now executed by thread:%p"
,
pQInfo
->
qId
,
pQInfo
,
(
void
*
)
curOwner
);
pQInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
false
;
}
*
qId
=
pQInfo
->
qId
;
if
(
pQInfo
->
startExecTs
==
0
)
pQInfo
->
startExecTs
=
taosGetTimestampMs
();
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:0x%"
PRIx64
" it is already killed, abort"
,
pQInfo
->
qId
);
return
doBuildResCheck
(
pQInfo
);
}
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
if
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:0x%"
PRIx64
" no table exists for query, abort"
,
pQInfo
->
qId
);
// setTaskStatus(pRuntimeEnv, QUERY_COMPLETED);
return
doBuildResCheck
(
pQInfo
);
}
// error occurs, record the error code and return to client
int32_t
ret
=
setjmp
(
pQInfo
->
runtimeEnv
.
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
publishQueryAbortEvent
(
pQInfo
,
ret
);
pQInfo
->
code
=
ret
;
qDebug
(
"QInfo:0x%"
PRIx64
" query abort due to error/cancel occurs, code:%s"
,
pQInfo
->
qId
,
tstrerror
(
pQInfo
->
code
));
return
doBuildResCheck
(
pQInfo
);
}
qDebug
(
"QInfo:0x%"
PRIx64
" query task is launched"
,
pQInfo
->
qId
);
bool
newgroup
=
false
;
publishOperatorProfEvent
(
pRuntimeEnv
->
proot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
int64_t
st
=
taosGetTimestampUs
();
pRuntimeEnv
->
outputBuf
=
pRuntimeEnv
->
proot
->
exec
(
pRuntimeEnv
->
proot
,
&
newgroup
);
pQInfo
->
summary
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
#ifdef TEST_IMPL
waitMoment
(
pQInfo
);
#endif
publishOperatorProfEvent
(
pRuntimeEnv
->
proot
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
pRuntimeEnv
->
resultInfo
.
total
+=
GET_NUM_OF_RESULTS
(
pRuntimeEnv
);
if
(
isQueryKilled
(
pQInfo
))
{
qDebug
(
"QInfo:0x%"
PRIx64
" query is killed"
,
pQInfo
->
qId
);
}
else
if
(
GET_NUM_OF_RESULTS
(
pRuntimeEnv
)
==
0
)
{
qDebug
(
"QInfo:0x%"
PRIx64
" over, %u tables queried, total %"
PRId64
" rows returned"
,
pQInfo
->
qId
,
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
,
pRuntimeEnv
->
resultInfo
.
total
);
}
else
{
qDebug
(
"QInfo:0x%"
PRIx64
" query paused, %d rows returned, total:%"
PRId64
" rows"
,
pQInfo
->
qId
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
pRuntimeEnv
->
resultInfo
.
total
);
}
return
doBuildResCheck
(
pQInfo
);
}
int32_t
qRetrieveQueryResultInfo
(
qTaskInfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
qError
(
"QInfo invalid qhandle"
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QInfo:0x%"
PRIx64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
tsRetrieveBlockingModel
)
{
pQInfo
->
rspContext
=
pRspContext
;
tsem_wait
(
&
pQInfo
->
ready
);
*
buildRes
=
true
;
code
=
pQInfo
->
code
;
}
else
{
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
STaskAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
pthread_mutex_lock
(
&
pQInfo
->
lock
);
assert
(
pQInfo
->
rspContext
==
NULL
);
if
(
pQInfo
->
dataReady
==
QUERY_RESULT_READY
)
{
*
buildRes
=
true
;
qDebug
(
"QInfo:0x%"
PRIx64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQueryAttr
->
resultRowSize
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
tstrerror
(
pQInfo
->
code
));
}
else
{
*
buildRes
=
false
;
qDebug
(
"QInfo:0x%"
PRIx64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
pQInfo
->
rspContext
=
pRspContext
;
assert
(
pQInfo
->
rspContext
!=
NULL
);
}
code
=
pQInfo
->
code
;
pthread_mutex_unlock
(
&
pQInfo
->
lock
);
}
return
code
;
}
void
*
qGetResultRetrieveMsg
(
qTaskInfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
assert
(
pQInfo
!=
NULL
);
return
pQInfo
->
rspContext
;
}
int32_t
qKillTask
(
qTaskInfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"QInfo:0x%"
PRIx64
" query killed"
,
pQInfo
->
qId
);
setQueryKilled
(
pQInfo
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while
(
pQInfo
->
owner
!=
0
)
{
taosMsleep
(
100
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qIsTaskCompleted
(
qTaskInfo_t
qinfo
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
))
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
return
isQueryKilled
(
pQInfo
)
||
Q_STATUS_EQUAL
(
pQInfo
->
runtimeEnv
.
status
,
QUERY_OVER
);
}
void
qDestroyTask
(
qTaskInfo_t
qHandle
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qHandle
;
if
(
!
isValidQInfo
(
pQInfo
))
{
return
;
}
qDebug
(
"QInfo:0x%"
PRIx64
" query completed"
,
pQInfo
->
qId
);
queryCostStatis
(
pQInfo
);
// print the query cost summary
doDestroyTask
(
pQInfo
);
}
void
*
qOpenTaskMgmt
(
int32_t
vgId
)
{
const
int32_t
refreshHandleInterval
=
30
;
// every 30 seconds, refresh handle pool
char
cacheName
[
128
]
=
{
0
};
sprintf
(
cacheName
,
"qhandle_%d"
,
vgId
);
STaskMgmt
*
pTaskMgmt
=
calloc
(
1
,
sizeof
(
STaskMgmt
));
if
(
pTaskMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
NULL
;
}
pTaskMgmt
->
qinfoPool
=
taosCacheInit
(
TSDB_CACHE_PTR_KEY
,
refreshHandleInterval
,
true
,
freeqinfoFn
,
cacheName
);
pTaskMgmt
->
closed
=
false
;
pTaskMgmt
->
vgId
=
vgId
;
pthread_mutex_init
(
&
pTaskMgmt
->
lock
,
NULL
);
qDebug
(
"vgId:%d, open queryTaskMgmt success"
,
vgId
);
return
pTaskMgmt
;
}
void
qTaskMgmtNotifyClosing
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
STaskMgmt
*
pQueryMgmt
=
pQMgmt
;
qInfo
(
"vgId:%d, set querymgmt closed, wait for all queries cancelled"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
true
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
taosCacheRefresh
(
pQueryMgmt
->
qinfoPool
,
taskMgmtKillTaskFn
,
NULL
);
}
void
qQueryMgmtReOpen
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
STaskMgmt
*
pQueryMgmt
=
pQMgmt
;
qInfo
(
"vgId:%d, set querymgmt reopen"
,
pQueryMgmt
->
vgId
);
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
pQueryMgmt
->
closed
=
false
;
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
}
void
qCleanupTaskMgmt
(
void
*
pQMgmt
)
{
if
(
pQMgmt
==
NULL
)
{
return
;
}
STaskMgmt
*
pQueryMgmt
=
pQMgmt
;
int32_t
vgId
=
pQueryMgmt
->
vgId
;
assert
(
pQueryMgmt
->
closed
);
SCacheObj
*
pqinfoPool
=
pQueryMgmt
->
qinfoPool
;
pQueryMgmt
->
qinfoPool
=
NULL
;
taosCacheCleanup
(
pqinfoPool
);
pthread_mutex_destroy
(
&
pQueryMgmt
->
lock
);
tfree
(
pQueryMgmt
);
qDebug
(
"vgId:%d, queryMgmt cleanup completed"
,
vgId
);
}
void
**
qRegisterTask
(
void
*
pMgmt
,
uint64_t
qId
,
void
*
qInfo
)
{
if
(
pMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QInfo:0x%"
PRIx64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QInfo:0x%"
PRIx64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
void
**
handle
=
taosCachePut
(
pQueryMgmt
->
qinfoPool
,
&
qId
,
sizeof
(
qId
),
&
qInfo
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
(
getMaximumIdleDurationSec
()
*
1000
));
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
return
handle
;
}
}
void
**
qAcquireTask
(
void
*
pMgmt
,
uint64_t
_key
)
{
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
closed
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
NULL
;
}
void
**
handle
=
taosCacheAcquireByKey
(
pQueryMgmt
->
qinfoPool
,
&
_key
,
sizeof
(
_key
));
if
(
handle
==
NULL
||
*
handle
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_INVALID_QHANDLE
;
return
NULL
;
}
else
{
return
handle
;
}
}
void
**
qReleaseTask
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
freeHandle
)
{
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
return
NULL
;
}
taosCacheRelease
(
pQueryMgmt
->
qinfoPool
,
pQInfo
,
freeHandle
);
return
0
;
}
#if 0
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t error = TSDB_CODE_SUCCESS;
void** handle = qAcquireTask(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
error = TSDB_CODE_FAILED;
break;
}
}
qReleaseTask(pMgmt, (void **)&handle, true);
return error;
}
#endif
\ No newline at end of file
source/libs/executor/src/executorimpl.c
浏览文件 @
00faaa41
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlan.c
浏览文件 @
00faaa41
...
@@ -149,7 +149,7 @@ static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
...
@@ -149,7 +149,7 @@ static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
static
uint8_t
getScanFlag
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
static
uint8_t
getScanFlag
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
)
{
// todo
// todo
return
MA
STER
_SCAN
;
return
MA
IN
_SCAN
;
}
}
static
SPhyNode
*
createUserTableScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
,
int32_t
op
)
{
static
SPhyNode
*
createUserTableScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
,
int32_t
op
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录