Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c8e13205
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c8e13205
编写于
3月 29, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
f6bb3f55
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
519 addition
and
47 deletion
+519
-47
include/libs/qcom/query.h
include/libs/qcom/query.h
+2
-2
source/libs/nodes/src/nodesToSQLFuncs.c
source/libs/nodes/src/nodesToSQLFuncs.c
+134
-0
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+2
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+63
-0
source/libs/qcom/inc/queryInt.h
source/libs/qcom/inc/queryInt.h
+22
-4
source/libs/qcom/src/queryExplain.c
source/libs/qcom/src/queryExplain.c
+286
-35
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+10
-5
未找到文件。
include/libs/qcom/query.h
浏览文件 @
c8e13205
...
...
@@ -53,11 +53,11 @@ typedef struct SIndexMeta {
}
SIndexMeta
;
typedef
struct
S
PhysiNode
ExplainResNode
{
typedef
struct
SExplainResNode
{
SNodeList
*
pChildren
;
SPhysiNode
*
pNode
;
void
*
pExecInfo
;
}
S
PhysiNode
ExplainResNode
;
}
SExplainResNode
;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
...
...
source/libs/nodes/src/nodesToSQLFuncs.c
0 → 100644
浏览文件 @
c8e13205
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "nodesUtil.h"
#include "plannodes.h"
#include "querynodes.h"
#include "taos.h"
#include "taoserror.h"
#include "thash.h"
char
*
gOperatorStr
[]
=
{
NULL
,
"+"
,
"-"
,
"*"
,
"/"
,
"%"
,
"&"
,
"|"
,
">"
,
">="
,
"<"
,
"<="
,
"="
,
"<>"
,
"IN"
,
"NOT IN"
,
"LIKE"
,
"NOT LIKE"
,
"MATCH"
,
"NMATCH"
,
"IS NULL"
,
"IS NOT NULL"
,
"IS TRUE"
,
"IS FALSE"
,
"IS UNKNOWN"
,
"IS NOT TRUE"
,
"IS NOT FALSE"
,
"IS NOT UNKNOWN"
};
char
*
gLogicConditionStr
[]
=
{
"AND"
,
"OR"
,
"NOT"
};
int32_t
nodesNodeToSQL
(
SNode
*
pNode
,
char
*
buf
,
int32_t
bufSize
,
int32_t
*
len
)
{
switch
(
pNode
->
type
)
{
case
QUERY_NODE_COLUMN
:
{
SColumnNode
*
colNode
=
(
SColumnNode
*
)
pNode
;
*
len
=
0
;
if
(
colNode
->
dbName
[
0
])
{
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"`%s`."
,
colNode
->
dbName
);
}
if
(
colNode
->
tableAlias
[
0
])
{
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"`%s`."
,
colNode
->
tableAlias
);
}
else
if
(
colNode
->
tableName
[
0
])
{
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"`%s`."
,
colNode
->
tableName
);
}
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"`%s`"
,
colNode
->
colName
);
return
TSDB_CODE_SUCCESS
;
}
case
QUERY_NODE_VALUE
:{
SValueNode
*
colNode
=
(
SValueNode
*
)
pNode
;
char
*
t
=
nodesGetStrValueFromNode
(
colNode
);
if
(
NULL
==
t
)
{
nodesError
(
"fail to get str value from valueNode"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"%s"
,
t
);
taosMemoryFree
(
t
);
return
TSDB_CODE_SUCCESS
;
}
case
QUERY_NODE_OPERATOR
:
{
SOperatorNode
*
pOpNode
=
(
SOperatorNode
*
)
pNode
;
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"("
);
if
(
pOpNode
->
pLeft
)
{
QRY_ERR_RET
(
nodesNodeToSQL
(
pOpNode
->
pLeft
,
buf
,
bufSize
,
len
));
}
if
(
pOpNode
->
opType
>=
(
sizeof
(
gOperatorStr
)
/
sizeof
(
gOperatorStr
[
0
])))
{
nodesError
(
"unknown operation type:%d"
,
pOpNode
->
opType
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
" %s "
,
gOperatorStr
[
pOpNode
->
opType
]);
if
(
pOpNode
->
pRight
)
{
QRY_ERR_RET
(
nodesNodeToSQL
(
pOpNode
->
pRight
,
buf
,
bufSize
,
len
));
}
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
")"
);
return
TSDB_CODE_SUCCESS
;
}
case
QUERY_NODE_LOGIC_CONDITION
:{
SLogicConditionNode
*
pLogicNode
=
(
SLogicConditionNode
*
)
pNode
;
SNode
*
node
=
NULL
;
bool
first
=
true
;
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"("
);
FOREACH
(
node
,
pLogicNode
->
pParameterList
)
{
if
(
!
first
)
{
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
" %s "
,
gLogicConditionStr
[
pLogicNode
->
condType
]);
}
QRY_ERR_RET
(
nodesNodeToSQL
(
node
,
buf
,
bufSize
,
len
));
first
=
false
;
}
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
")"
);
return
TSDB_CODE_SUCCESS
;
}
case
QUERY_NODE_FUNCTION
:{
SFunctionNode
*
pFuncNode
=
(
SFunctionNode
*
)
pNode
;
SNode
*
node
=
NULL
;
bool
first
=
true
;
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
"%s("
,
pFuncNode
->
functionName
);
FOREACH
(
node
,
pFuncNode
->
pParameterList
)
{
if
(
!
first
)
{
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
", "
);
}
QRY_ERR_RET
(
nodesNodeToSQL
(
node
,
buf
,
bufSize
,
len
));
first
=
false
;
}
*
len
+=
snprintf
(
buf
,
bufSize
-
*
len
,
")"
);
return
TSDB_CODE_SUCCESS
;
}
case
QUERY_NODE_NODE_LIST
:{
SNodeListNode
*
pListNode
=
(
SNodeListNode
*
)
pNode
;
//TODO
return
TSDB_CODE_SUCCESS
;
}
default:
break
;
}
nodesError
(
"nodesNodeToSQL unknown node = %s"
,
nodesNodeName
(
pNode
->
type
));
return
TSDB_CODE_QRY_APP_ERROR
;
}
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
c8e13205
...
...
@@ -17,7 +17,8 @@
typedef
enum
ETraversalOrder
{
TRAVERSAL_PREORDER
=
1
,
TRAVERSAL_POSTORDER
TRAVERSAL_INORDER
,
TRAVERSAL_POSTORDER
,
}
ETraversalOrder
;
static
EDealRes
walkList
(
SNodeList
*
pNodeList
,
ETraversalOrder
order
,
FNodeWalker
walker
,
void
*
pContext
);
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
c8e13205
...
...
@@ -467,6 +467,69 @@ void* nodesGetValueFromNode(SValueNode *pNode) {
return
NULL
;
}
char
*
nodesGetStrValueFromNode
(
SValueNode
*
pNode
)
{
switch
(
pNode
->
node
.
resType
.
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
void
*
buf
=
taosMemoryMalloc
(
MAX_NUM_STR_SIZE
);
if
(
NULL
==
buf
)
{
return
NULL
;
}
sprintf
(
buf
,
"%s"
,
pNode
->
datum
.
b
?
"true"
:
"false"
);
return
buf
;
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
{
void
*
buf
=
taosMemoryMalloc
(
MAX_NUM_STR_SIZE
);
if
(
NULL
==
buf
)
{
return
NULL
;
}
sprintf
(
buf
,
"%"
PRId64
,
pNode
->
datum
.
i
);
return
buf
;
}
case
TSDB_DATA_TYPE_UTINYINT
:
case
TSDB_DATA_TYPE_USMALLINT
:
case
TSDB_DATA_TYPE_UINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
{
void
*
buf
=
taosMemoryMalloc
(
MAX_NUM_STR_SIZE
);
if
(
NULL
==
buf
)
{
return
NULL
;
}
sprintf
(
buf
,
"%"
PRIu64
,
pNode
->
datum
.
u
);
return
buf
;
}
case
TSDB_DATA_TYPE_FLOAT
:
case
TSDB_DATA_TYPE_DOUBLE
:
{
void
*
buf
=
taosMemoryMalloc
(
MAX_NUM_STR_SIZE
);
if
(
NULL
==
buf
)
{
return
NULL
;
}
sprintf
(
buf
,
"%e"
,
pNode
->
datum
.
d
);
return
buf
;
}
case
TSDB_DATA_TYPE_NCHAR
:
case
TSDB_DATA_TYPE_VARCHAR
:
case
TSDB_DATA_TYPE_VARBINARY
:
{
void
*
buf
=
taosMemoryMalloc
(
varDataLen
(
pNode
->
datum
.
p
)
+
1
);
if
(
NULL
==
buf
)
{
return
NULL
;
}
strncpy
(
buf
,
varDataVal
(
pNode
->
datum
.
p
),
varDataLen
(
pNode
->
datum
.
p
)
+
1
);
return
buf
;
}
default:
break
;
}
return
NULL
;
}
bool
nodesIsExprNode
(
const
SNode
*
pNode
)
{
ENodeType
type
=
nodeType
(
pNode
);
return
(
QUERY_NODE_COLUMN
==
type
||
QUERY_NODE_VALUE
==
type
||
QUERY_NODE_OPERATOR
==
type
||
QUERY_NODE_FUNCTION
==
type
);
...
...
source/libs/qcom/inc/queryInt.h
浏览文件 @
c8e13205
...
...
@@ -22,8 +22,24 @@ extern "C" {
#define QUERY_EXPLAIN_MAX_RES_LEN 1024
#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s"
#define EXPLAIN_ORDER_FORMAT "Order: "
#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s columns=%d"
#define EXPLAIN_TBL_SCAN_FORMAT "Table scan on %s columns=%d"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System table scan on %s columns=%d"
#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d"
#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d"
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d groups=%d width=%d"
#define EXPLAIN_EXCHANGE_FORMAT "Exchange %d:1 width=%d"
#define EXPLAIN_SORT_FORMAT "Sort on %d columns width=%d"
#define EXPLAIN_INTERVAL_FORMAT "Interval on column %s functions=%d interval=%d%c offset=%d%c sliding=%d%c width=%d"
#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d"
#define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s"
#define EXPLAIN_ON_CONDITIONS_FORMAT "ON Conditions: "
#define EXPLAIN_TIMERANGE_FORMAT "Time range: [%" PRId64 ", %" PRId64 "]"
#define EXPLAIN_LOOPS_FORMAT "loops %d"
#define EXPLAIN_REVERSE_FORMAT "reverse %d"
typedef
struct
SQueryExplainRowInfo
{
int32_t
level
;
...
...
@@ -31,9 +47,11 @@ typedef struct SQueryExplainRowInfo {
char
*
buf
;
}
SQueryExplainRowInfo
;
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
#define QUERY_EXPLAIN_NEWLINE(
_format, ...) tlen = snprintf(tbuf, QUERY_EXPLAIN_MAX_RES_LEN, _format
, __VA_ARGS__)
#define QUERY_EXPLAIN_APPEND(
_format, ...) tlen += snprintf(tbuf + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, _format
, __VA_ARGS__)
#define QUERY_EXPLAIN_NEWLINE(
...) tlen = snprintf(tbuf, QUERY_EXPLAIN_MAX_RES_LEN
, __VA_ARGS__)
#define QUERY_EXPLAIN_APPEND(
...) tlen += snprintf(tbuf + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen
, __VA_ARGS__)
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
...
...
source/libs/qcom/src/queryExplain.c
浏览文件 @
c8e13205
...
...
@@ -19,22 +19,124 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wformat-truncation"
void
qFreeExplainRes
(
S
PhysiNodeExplainRes
*
res
)
{
void
qFreeExplainRes
(
S
ExplainResNode
*
res
)
{
}
char
*
qFillModeString
(
EFillMode
mode
)
{
switch
(
mode
)
{
case
FILL_MODE_NONE
:
return
"none"
;
case
FILL_MODE_VALUE
:
return
"value"
;
case
FILL_MODE_PREV
:
return
"prev"
;
case
FILL_MODE_NULL
:
return
"null"
;
case
FILL_MODE_LINEAR
:
return
"linear"
;
case
FILL_MODE_NEXT
:
return
"next"
;
default:
return
"unknown"
;
}
}
char
*
qGetNameFromColumnNode
(
SNode
*
pNode
)
{
if
(
NULL
==
pNode
||
QUERY_NODE_COLUMN
!=
pNode
->
type
)
{
return
"NULL"
;
}
return
((
SColumnNode
*
)
pNode
)
->
colName
;
}
int32_t
qMakeExplainResChildrenInfo
(
SPhysiNode
*
pNode
,
void
*
pExecInfo
,
SNodeList
**
pChildren
)
{
int32_t
tlen
=
0
;
SNodeList
*
pPhysiChildren
=
NULL
;
switch
(
pNode
->
type
)
{
case
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
:
{
STagScanPhysiNode
*
pTagScanNode
=
(
STagScanPhysiNode
*
)
pNode
;
pPhysiChildren
=
pTagScanNode
->
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
:{
STableScanPhysiNode
*
pTblScanNode
=
(
STableScanPhysiNode
*
)
pNode
;
pPhysiChildren
=
pTblScanNode
->
scan
.
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
:{
SSystemTableScanPhysiNode
*
pSTblScanNode
=
(
SSystemTableScanPhysiNode
*
)
pNode
;
pPhysiChildren
=
pSTblScanNode
->
scan
.
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:{
SProjectPhysiNode
*
pPrjNode
=
(
SProjectPhysiNode
*
)
pNode
;
pPhysiChildren
=
pPrjNode
->
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_JOIN
:{
SJoinPhysiNode
*
pJoinNode
=
(
SJoinPhysiNode
*
)
pNode
;
pPhysiChildren
=
pJoinNode
->
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:{
SAggPhysiNode
*
pAggNode
=
(
SAggPhysiNode
*
)
pNode
;
pPhysiChildren
=
pAggNode
->
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:{
SExchangePhysiNode
*
pExchNode
=
(
SExchangePhysiNode
*
)
pNode
;
pPhysiChildren
=
pExchNode
->
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:{
SSortPhysiNode
*
pSortNode
=
(
SSortPhysiNode
*
)
pNode
;
pPhysiChildren
=
pSortNode
->
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:{
SIntervalPhysiNode
*
pIntNode
=
(
SIntervalPhysiNode
*
)
pNode
;
pPhysiChildren
=
pIntNode
->
window
.
node
.
pChildren
;
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:{
SSessionWinodwPhysiNode
*
pSessNode
=
(
SSessionWinodwPhysiNode
*
)
pNode
;
pPhysiChildren
=
pSessNode
->
window
.
node
.
pChildren
;
break
;
}
default:
qError
(
"not supported physical node type %d"
,
pNode
->
type
);
QRY_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
if
(
pPhysiChildren
)
{
*
pChildren
=
nodesMakeList
();
if
(
NULL
==
*
pChildren
)
{
qError
(
"nodesMakeList failed"
);
QRY_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SNode
*
node
=
NULL
;
SExplainResNode
*
pResNode
=
NULL
;
FOREACH
(
node
,
pPhysiChildren
)
{
QRY_ERR_RET
(
qMakeExplainResNode
((
SPhysiNode
*
)
node
,
pExecInfo
,
&
pResNode
));
QRY_ERR_RET
(
nodesListAppend
(
*
pChildren
,
pResNode
));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qMakeExplainResNode
(
SPhysiNode
*
pNode
,
void
*
pExecInfo
,
S
PhysiNode
ExplainResNode
**
pRes
)
{
int32_t
qMakeExplainResNode
(
SPhysiNode
*
pNode
,
void
*
pExecInfo
,
SExplainResNode
**
pRes
)
{
if
(
NULL
==
pNode
)
{
*
pRes
=
NULL
;
qError
(
"physical node is NULL"
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
S
PhysiNodeExplainResNode
*
res
=
calloc
(
1
,
sizeof
(
SPhysiNode
ExplainResNode
));
S
ExplainResNode
*
res
=
taosMemoryCalloc
(
1
,
sizeof
(
S
ExplainResNode
));
if
(
NULL
==
res
)
{
qError
(
"calloc SPhysiNodeExplainRes failed"
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
...
...
@@ -56,7 +158,7 @@ _return:
QRY_RET
(
code
);
}
int32_t
qMakeTaskExplainResTree
(
struct
SSubplan
*
plan
,
void
*
pExecTree
,
S
PhysiNode
ExplainResNode
**
pRes
)
{
int32_t
qMakeTaskExplainResTree
(
struct
SSubplan
*
plan
,
void
*
pExecTree
,
SExplainResNode
**
pRes
)
{
char
*
tbuf
=
taosMemoryMalloc
(
QUERY_EXPLAIN_MAX_RES_LEN
);
if
(
NULL
==
tbuf
)
{
qError
(
"malloc size %d failed"
,
QUERY_EXPLAIN_MAX_RES_LEN
);
...
...
@@ -71,7 +173,7 @@ int32_t qMakeTaskExplainResTree(struct SSubplan *plan, void *pExecTree, SPhysiNo
QRY_RET
(
code
);
}
int32_t
qExplain
ResNodeAppendExecInfo
(
void
*
pExecInfo
,
char
*
tbuf
)
{
int32_t
qExplain
BufAppendExecInfo
(
void
*
pExecInfo
,
char
*
tbuf
,
int32_t
tlen
)
{
}
...
...
@@ -96,7 +198,7 @@ int32_t qExplainResAppendRow(SArray *pRows, char *tbuf, int32_t len, int32_t lev
}
int32_t
qExplainResNodeToRowsImpl
(
S
PhysiNode
ExplainResNode
*
pResNode
,
SArray
*
pRows
,
char
*
tbuf
,
int32_t
level
)
{
int32_t
qExplainResNodeToRowsImpl
(
SExplainResNode
*
pResNode
,
SArray
*
pRows
,
char
*
tbuf
,
int32_t
level
)
{
int32_t
tlen
=
0
;
SPhysiNode
*
pNode
=
pResNode
->
pNode
;
if
(
NULL
==
pNode
)
{
...
...
@@ -107,25 +209,177 @@ int32_t qExplainResNodeToRowsImpl(SPhysiNodeExplainResNode *pResNode, SArray *pR
switch
(
pNode
->
type
)
{
case
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
:
{
STagScanPhysiNode
*
pTagScanNode
=
(
STagScanPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_TAG_SCAN_FORMAT
,
pTagScanNode
->
tableName
.
tname
);
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_TAG_SCAN_FORMAT
,
pTagScanNode
->
tableName
.
tname
,
pTagScanNode
->
pScanCols
->
length
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainResNodeAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
));
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
));
}
QUERY_EXPLAIN_APPEND
(
EXPLAIN_LOOPS_FORMAT
,
pTagScanNode
->
count
);
if
(
pTagScanNode
->
reverse
)
{
QUERY_EXPLAIN_APPEND
(
EXPLAIN_REVERSE_FORMAT
,
pTagScanNode
->
reverse
);
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pTagScanNode
->
order
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:
case
QUERY_NODE_PHYSICAL_PLAN_JOIN
:
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
:{
STableScanPhysiNode
*
pTblScanNode
=
(
STableScanPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_TBL_SCAN_FORMAT
,
pTblScanNode
->
scan
.
tableName
.
tname
,
pTblScanNode
->
scan
.
pScanCols
->
length
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QUERY_EXPLAIN_APPEND
(
EXPLAIN_LOOPS_FORMAT
,
pTblScanNode
->
scan
.
count
);
if
(
pTblScanNode
->
scan
.
reverse
)
{
QUERY_EXPLAIN_APPEND
(
EXPLAIN_REVERSE_FORMAT
,
pTblScanNode
->
scan
.
reverse
);
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pTblScanNode
->
scan
.
order
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_TIMERANGE_FORMAT
,
pTblScanNode
->
scanRange
.
skey
,
pTblScanNode
->
scanRange
.
ekey
);
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
if
(
pTblScanNode
->
pScanConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pTblScanNode
->
pScanConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
:{
SSystemTableScanPhysiNode
*
pSTblScanNode
=
(
SSystemTableScanPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_SYSTBL_SCAN_FORMAT
,
pSTblScanNode
->
scan
.
tableName
.
tname
,
pSTblScanNode
->
scan
.
pScanCols
->
length
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QUERY_EXPLAIN_APPEND
(
EXPLAIN_LOOPS_FORMAT
,
pSTblScanNode
->
scan
.
count
);
if
(
pSTblScanNode
->
scan
.
reverse
)
{
QUERY_EXPLAIN_APPEND
(
EXPLAIN_REVERSE_FORMAT
,
pSTblScanNode
->
scan
.
reverse
);
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pSTblScanNode
->
scan
.
order
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_PROJECT
:{
SProjectPhysiNode
*
pPrjNode
=
(
SProjectPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_PROJECTION_FORMAT
,
pPrjNode
->
pProjections
->
length
,
pPrjNode
->
node
.
pOutputDataBlockDesc
->
resultRowSize
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
if
(
pPrjNode
->
node
.
pConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pPrjNode
->
node
.
pConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_JOIN
:{
SJoinPhysiNode
*
pJoinNode
=
(
SJoinPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_JOIN_FORMAT
,
EXPLAIN_JOIN_STRING
(
pJoinNode
->
joinType
),
pJoinNode
->
pTargets
->
length
,
pJoinNode
->
node
.
pOutputDataBlockDesc
->
resultRowSize
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
if
(
pJoinNode
->
node
.
pConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pJoinNode
->
node
.
pConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_ON_CONDITIONS_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pJoinNode
->
pOnConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:{
SAggPhysiNode
*
pAggNode
=
(
SAggPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_AGG_FORMAT
,
pAggNode
->
pAggFuncs
->
length
,
pAggNode
->
pGroupKeys
->
length
,
pAggNode
->
node
.
pOutputDataBlockDesc
->
resultRowSize
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
if
(
pAggNode
->
node
.
pConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pAggNode
->
node
.
pConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
:{
SExchangePhysiNode
*
pExchNode
=
(
SExchangePhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_EXCHANGE_FORMAT
,
pExchNode
->
pSrcEndPoints
->
length
,
pExchNode
->
node
.
pOutputDataBlockDesc
->
resultRowSize
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
if
(
pExchNode
->
node
.
pConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pExchNode
->
node
.
pConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:{
SSortPhysiNode
*
pSortNode
=
(
SSortPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_SORT_FORMAT
,
pSortNode
->
pSortKeys
->
length
,
pSortNode
->
node
.
pOutputDataBlockDesc
->
resultRowSize
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
if
(
pSortNode
->
node
.
pConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pSortNode
->
node
.
pConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:{
SIntervalPhysiNode
*
pIntNode
=
(
SIntervalPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_INTERVAL_FORMAT
,
qGetNameFromColumnNode
(
pIntNode
->
pTspk
),
pIntNode
->
window
.
pFuncs
->
length
,
pIntNode
->
interval
,
pIntNode
->
intervalUnit
,
pIntNode
->
offset
,
pIntNode
->
intervalUnit
,
pIntNode
->
sliding
,
pIntNode
->
slidingUnit
,
pIntNode
->
window
.
node
.
pOutputDataBlockDesc
->
resultRowSize
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
if
(
pIntNode
->
pFill
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILL_FORMAT
,
qFillModeString
(
pIntNode
->
pFill
->
mode
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
if
(
pIntNode
->
window
.
node
.
pConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pIntNode
->
window
.
node
.
pConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
break
;
}
case
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
:{
SSessionWinodwPhysiNode
*
pIntNode
=
(
SSessionWinodwPhysiNode
*
)
pNode
;
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_SESSION_FORMAT
,
pIntNode
->
gap
,
pIntNode
->
window
.
pFuncs
->
length
,
pIntNode
->
window
.
node
.
pOutputDataBlockDesc
->
resultRowSize
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
tlen
));
}
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
));
if
(
pIntNode
->
window
.
node
.
pConditions
)
{
QUERY_EXPLAIN_NEWLINE
(
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pIntNode
->
window
.
node
.
pConditions
,
tbuf
,
QUERY_EXPLAIN_MAX_RES_LEN
,
&
tlen
));
QRY_ERR_RET
(
qExplainResAppendRow
(
pRows
,
tbuf
,
tlen
,
level
+
1
));
}
break
;
}
default:
qError
(
"not supported physical node type %d"
,
pNode
->
type
);
return
TSDB_CODE_QRY_APP_ERROR
;
...
...
@@ -135,7 +389,7 @@ int32_t qExplainResNodeToRowsImpl(SPhysiNodeExplainResNode *pResNode, SArray *pR
}
int32_t
qExplainResNodeToRows
(
S
PhysiNode
ExplainResNode
*
pResNode
,
SArray
*
pRsp
,
char
*
tbuf
,
int32_t
level
)
{
int32_t
qExplainResNodeToRows
(
SExplainResNode
*
pResNode
,
SArray
*
pRsp
,
char
*
tbuf
,
int32_t
level
)
{
if
(
NULL
==
pResNode
)
{
qError
(
"explain res node is NULL"
);
QRY_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
...
...
@@ -146,14 +400,22 @@ int32_t qExplainResNodeToRows(SPhysiNodeExplainResNode *pResNode, SArray *pRsp,
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pResNode
->
pChildren
)
{
QRY_ERR_RET
(
qExplainResNodeToRows
((
S
PhysiNode
ExplainResNode
*
)
pNode
,
pRsp
,
tbuf
,
level
+
1
));
QRY_ERR_RET
(
qExplainResNodeToRows
((
SExplainResNode
*
)
pNode
,
pRsp
,
tbuf
,
level
+
1
));
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
qExplainRowsToRsp
(
SArray
*
rows
,
SRetrieveTableRsp
**
pRsp
)
{
int32_t
rspSize
=
sizeof
(
SRetrieveTableRsp
)
+
;
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
taosMemoryCalloc
(
1
,
rspSize
);
if
(
NULL
==
rsp
)
{
qError
(
"malloc SRetrieveTableRsp failed"
);
QRY_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
int32_t
qMakeTaskExplainResRows
(
S
PhysiNode
ExplainResNode
*
pResNode
,
SRetrieveTableRsp
**
pRsp
)
{
int32_t
qMakeTaskExplainResRows
(
SExplainResNode
*
pResNode
,
SRetrieveTableRsp
**
pRsp
)
{
if
(
NULL
==
pResNode
)
{
qError
(
"explain res node is NULL"
);
QRY_RET
(
TSDB_CODE_QRY_APP_ERROR
);
...
...
@@ -174,22 +436,11 @@ int32_t qMakeTaskExplainResRows(SPhysiNodeExplainResNode *pResNode, SRetrieveTab
QRY_ERR_JRET
(
qExplainResNodeToRows
(
pResNode
,
rows
,
tbuf
,
0
));
int32_t
rspSize
=
sizeof
(
SRetrieveTableRsp
)
+
;
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
taosMemoryCalloc
(
1
,
rspSize
);
if
(
NULL
==
rsp
)
{
qError
(
"malloc SRetrieveTableRsp failed"
);
QRY_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
QRY_ERR_JRET
(
qExplainRowsToRsp
(
rows
,
rsp
));
*
pRsp
=
rsp
;
rsp
=
NULL
;
QRY_ERR_JRET
(
qExplainRowsToRsp
(
rows
,
pRsp
));
_return:
taosMemoryFree
(
tbuf
);
taosMemoryFree
(
rsp
);
taosArrayDestroy
(
rows
);
QRY_RET
(
code
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
c8e13205
...
...
@@ -2126,19 +2126,24 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
tsem_init
(
&
pJob
->
rspSem
,
0
,
0
);
pJob
->
refId
=
taosAddRef
(
schMgmt
.
jobRef
,
pJob
);
if
(
pJob
->
refId
<
0
)
{
SCH_JOB_ELOG
(
"taos
HashPut
job failed, error:%s"
,
tstrerror
(
terrno
));
int64_t
refId
=
taosAddRef
(
schMgmt
.
jobRef
,
pJob
);
if
(
refId
<
0
)
{
SCH_JOB_ELOG
(
"taos
AddRef
job failed, error:%s"
,
tstrerror
(
terrno
));
SCH_ERR_JRET
(
terrno
);
}
if
(
NULL
==
schAcquireJob
(
refId
))
{
SCH_JOB_ELOG
(
"schAcquireJob job failed, refId:%"
PRIx64
,
refId
);
SCH_RET
(
TSDB_CODE_SCH_STATUS_ERROR
);
}
pJob
->
refId
=
refId
;
SCH_JOB_DLOG
(
"job refId:%"
PRIx64
,
pJob
->
refId
);
pJob
->
status
=
JOB_TASK_STATUS_NOT_START
;
SCH_ERR_JRET
(
schLaunchJob
(
pJob
));
schAcquireJob
(
pJob
->
refId
);
*
job
=
pJob
->
refId
;
if
(
syncSchedule
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录