Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
523855be
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
523855be
编写于
3月 01, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-13747 planner integrate
上级
138ffcae
变更
22
隐藏空白更改
内联
并排
Showing
22 changed file
with
581 addition
and
120 deletion
+581
-120
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+25
-1
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+7
-2
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+46
-0
include/libs/planner/planner.h
include/libs/planner/planner.h
+0
-29
include/util/tjson.h
include/util/tjson.h
+2
-0
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+2
-2
source/libs/executor/src/dataSinkMgt.c
source/libs/executor/src/dataSinkMgt.c
+1
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+167
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+4
-0
source/libs/parser/inc/astCreateFuncs.h
source/libs/parser/inc/astCreateFuncs.h
+17
-4
source/libs/parser/inc/new_sql.y
source/libs/parser/inc/new_sql.y
+51
-2
source/libs/parser/src/astCreateFuncs.c
source/libs/parser/src/astCreateFuncs.c
+82
-1
source/libs/parser/src/astParse.c
source/libs/parser/src/astParse.c
+2
-1
source/libs/parser/src/astTranslate.c
source/libs/parser/src/astTranslate.c
+7
-0
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-2
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+87
-22
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+3
-3
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+3
-3
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+5
-5
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+17
-17
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+24
-24
source/util/src/tjson.c
source/util/src/tjson.c
+28
-0
未找到文件。
include/libs/nodes/cmdnodes.h
浏览文件 @
523855be
...
@@ -20,7 +20,7 @@
...
@@ -20,7 +20,7 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
#include "nodes.h"
#include "
query
nodes.h"
typedef
struct
SDatabaseOptions
{
typedef
struct
SDatabaseOptions
{
int32_t
numOfBlocks
;
int32_t
numOfBlocks
;
...
@@ -49,6 +49,30 @@ typedef struct SCreateDatabaseStmt {
...
@@ -49,6 +49,30 @@ typedef struct SCreateDatabaseStmt {
SDatabaseOptions
options
;
SDatabaseOptions
options
;
}
SCreateDatabaseStmt
;
}
SCreateDatabaseStmt
;
typedef
struct
STableOptions
{
int32_t
keep
;
int32_t
ttl
;
char
comments
[
TSDB_STB_COMMENT_LEN
];
}
STableOptions
;
typedef
struct
SColumnDefNode
{
ENodeType
type
;
char
colName
[
TSDB_COL_NAME_LEN
];
SDataType
dataType
;
char
comments
[
TSDB_STB_COMMENT_LEN
];
}
SColumnDefNode
;
typedef
struct
SCreateTableStmt
{
ENodeType
type
;
char
dbName
[
TSDB_DB_NAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
bool
ignoreExists
;
SNodeList
*
pCols
;
STableOptions
options
;
}
SCreateTableStmt
;
// CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...) [table_options]
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/libs/nodes/nodes.h
浏览文件 @
523855be
...
@@ -65,6 +65,7 @@ typedef enum ENodeType {
...
@@ -65,6 +65,7 @@ typedef enum ENodeType {
QUERY_NODE_TARGET
,
QUERY_NODE_TARGET
,
QUERY_NODE_DATABLOCK_DESC
,
QUERY_NODE_DATABLOCK_DESC
,
QUERY_NODE_SLOT_DESC
,
QUERY_NODE_SLOT_DESC
,
QUERY_NODE_COLUMN_DEF
,
// Statement nodes are used in parser and planner module.
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR
,
QUERY_NODE_SET_OPERATOR
,
...
@@ -72,12 +73,15 @@ typedef enum ENodeType {
...
@@ -72,12 +73,15 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_STMT
,
QUERY_NODE_SHOW_STMT
,
QUERY_NODE_VNODE_MODIF_STMT
,
QUERY_NODE_VNODE_MODIF_STMT
,
QUERY_NODE_CREATE_DATABASE_STMT
,
QUERY_NODE_CREATE_DATABASE_STMT
,
QUERY_NODE_CREATE_TABLE_STMT
,
// logic plan node
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN
,
QUERY_NODE_LOGIC_PLAN_SCAN
,
QUERY_NODE_LOGIC_PLAN_JOIN
,
QUERY_NODE_LOGIC_PLAN_JOIN
,
QUERY_NODE_LOGIC_PLAN_AGG
,
QUERY_NODE_LOGIC_PLAN_AGG
,
QUERY_NODE_LOGIC_PLAN_PROJECT
,
QUERY_NODE_LOGIC_PLAN_PROJECT
,
QUERY_NODE_LOGIC_SUBPLAN
,
QUERY_NODE_LOGIC_PLAN
,
// physical plan node
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
,
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
,
...
@@ -89,8 +93,9 @@ typedef enum ENodeType {
...
@@ -89,8 +93,9 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_AGG
,
QUERY_NODE_PHYSICAL_PLAN_AGG
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
,
QUERY_NODE_DSINK_DISPATCH
QUERY_NODE_PHYSICAL_SUBPLAN
,
QUERY_NODE_PHYSICAL_PLAN
}
ENodeType
;
}
ENodeType
;
/**
/**
...
...
include/libs/nodes/plannodes.h
浏览文件 @
523855be
...
@@ -65,6 +65,18 @@ typedef struct SProjectLogicNode {
...
@@ -65,6 +65,18 @@ typedef struct SProjectLogicNode {
SNodeList
*
pProjections
;
SNodeList
*
pProjections
;
}
SProjectLogicNode
;
}
SProjectLogicNode
;
typedef
struct
SSubLogicPlan
{
ENodeType
type
;
SNodeList
*
pChildren
;
SNodeList
*
pParents
;
SLogicNode
*
pNode
;
}
SSubLogicPlan
;
typedef
struct
SQueryLogicPlan
{
ENodeType
type
;;
SNodeList
*
pSubplans
;
}
SQueryLogicPlan
;
typedef
struct
SSlotDescNode
{
typedef
struct
SSlotDescNode
{
ENodeType
type
;
ENodeType
type
;
int16_t
slotId
;
int16_t
slotId
;
...
@@ -98,6 +110,7 @@ typedef struct SScanPhysiNode {
...
@@ -98,6 +110,7 @@ typedef struct SScanPhysiNode {
int32_t
order
;
// scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t
order
;
// scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t
count
;
// repeat count
int32_t
count
;
// repeat count
int32_t
reverse
;
// reverse scan count
int32_t
reverse
;
// reverse scan count
char
tableName
[
TSDB_TABLE_NAME_LEN
];
}
SScanPhysiNode
;
}
SScanPhysiNode
;
typedef
SScanPhysiNode
SSystemTableScanPhysiNode
;
typedef
SScanPhysiNode
SSystemTableScanPhysiNode
;
...
@@ -159,6 +172,39 @@ typedef struct SDataInserterNode {
...
@@ -159,6 +172,39 @@ typedef struct SDataInserterNode {
char
*
pData
;
char
*
pData
;
}
SDataInserterNode
;
}
SDataInserterNode
;
typedef
struct
SSubplanId
{
uint64_t
queryId
;
int32_t
templateId
;
int32_t
subplanId
;
}
SSubplanId
;
typedef
enum
ESubplanType
{
SUBPLAN_TYPE_MERGE
=
1
,
SUBPLAN_TYPE_PARTIAL
,
SUBPLAN_TYPE_SCAN
,
SUBPLAN_TYPE_MODIFY
}
ESubplanType
;
typedef
struct
SSubplan
{
ENodeType
type
;
SSubplanId
id
;
// unique id of the subplan
ESubplanType
subplanType
;
int32_t
msgType
;
// message type for subplan, used to denote the send message type to vnode.
int32_t
level
;
// the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr
execNode
;
// for the scan/modify subplan, the optional execution node
SNodeList
*
pChildren
;
// the datasource subplan,from which to fetch the result
SNodeList
*
pParents
;
// the data destination subplan, get data from current subplan
SPhysiNode
*
pNode
;
// physical plan of current subplan
SDataSinkNode
*
pDataSink
;
// data of the subplan flow into the datasink
}
SSubplan
;
typedef
struct
SQueryPlan
{
ENodeType
type
;;
uint64_t
queryId
;
int32_t
numOfSubplans
;
SNodeList
*
pSubplans
;
// SNodeListNode. The execution level of subplan, starting from 0.
}
SQueryPlan
;
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/libs/planner/planner.h
浏览文件 @
523855be
...
@@ -22,35 +22,6 @@ extern "C" {
...
@@ -22,35 +22,6 @@ extern "C" {
#include "plannodes.h"
#include "plannodes.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
#define QUERY_TYPE_MODIFY 4
typedef
struct
SSubplanId
{
uint64_t
queryId
;
uint64_t
templateId
;
uint64_t
subplanId
;
}
SSubplanId
;
typedef
struct
SSubplan
{
SSubplanId
id
;
// unique id of the subplan
int32_t
type
;
// QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t
msgType
;
// message type for subplan, used to denote the send message type to vnode.
int32_t
level
;
// the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr
execNode
;
// for the scan/modify subplan, the optional execution node
SArray
*
pChildren
;
// the datasource subplan,from which to fetch the result
SArray
*
pParents
;
// the data destination subplan, get data from current subplan
SPhysiNode
*
pNode
;
// physical plan of current subplan
SDataSinkNode
*
pDataSink
;
// data of the subplan flow into the datasink
}
SSubplan
;
typedef
struct
SQueryPlan
{
uint64_t
queryId
;
int32_t
numOfSubplans
;
SArray
*
pSubplans
;
// SArray*<SArray*<SSubplan*>>. The execution level of subplan, starting from 0.
}
SQueryPlan
;
typedef
struct
SPlanContext
{
typedef
struct
SPlanContext
{
uint64_t
queryId
;
uint64_t
queryId
;
SNode
*
pAstRoot
;
SNode
*
pAstRoot
;
...
...
include/util/tjson.h
浏览文件 @
523855be
...
@@ -54,10 +54,12 @@ typedef int32_t (*FToJson)(const void* pObj, SJson* pJson);
...
@@ -54,10 +54,12 @@ typedef int32_t (*FToJson)(const void* pObj, SJson* pJson);
int32_t
tjsonAddObject
(
SJson
*
pJson
,
const
char
*
pName
,
FToJson
func
,
const
void
*
pObj
);
int32_t
tjsonAddObject
(
SJson
*
pJson
,
const
char
*
pName
,
FToJson
func
,
const
void
*
pObj
);
int32_t
tjsonAddItem
(
SJson
*
pJson
,
FToJson
func
,
const
void
*
pObj
);
int32_t
tjsonAddItem
(
SJson
*
pJson
,
FToJson
func
,
const
void
*
pObj
);
int32_t
tjsonAddArray
(
SJson
*
pJson
,
const
char
*
pName
,
FToJson
func
,
const
void
*
pArray
,
int32_t
itemSize
,
int32_t
num
);
typedef
int32_t
(
*
FToObject
)(
const
SJson
*
pJson
,
void
*
pObj
);
typedef
int32_t
(
*
FToObject
)(
const
SJson
*
pJson
,
void
*
pObj
);
int32_t
tjsonToObject
(
const
SJson
*
pJson
,
const
char
*
pName
,
FToObject
func
,
void
*
pObj
);
int32_t
tjsonToObject
(
const
SJson
*
pJson
,
const
char
*
pName
,
FToObject
func
,
void
*
pObj
);
int32_t
tjsonToArray
(
const
SJson
*
pJson
,
const
char
*
pName
,
FToObject
func
,
void
*
pArray
,
int32_t
itemSize
);
char
*
tjsonToString
(
const
SJson
*
pJson
);
char
*
tjsonToString
(
const
SJson
*
pJson
);
char
*
tjsonToUnformattedString
(
const
SJson
*
pJson
);
char
*
tjsonToUnformattedString
(
const
SJson
*
pJson
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
523855be
...
@@ -762,8 +762,8 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
...
@@ -762,8 +762,8 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
SVgObj
*
pVgroup
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
SQueryPlan
*
pPlan
=
qStringToQueryPlan
(
pTopic
->
physicalPlan
);
SQueryPlan
*
pPlan
=
qStringToQueryPlan
(
pTopic
->
physicalPlan
);
SArray
*
pArray
=
NULL
;
SArray
*
pArray
=
NULL
;
S
Array
*
inner
=
taosArrayGet
(
pPlan
->
pSubplans
,
0
);
S
NodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
SSubplan
*
plan
=
taosArrayGetP
(
inner
,
0
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
SArray
*
unassignedVg
=
pSub
->
unassignedVg
;
SArray
*
unassignedVg
=
pSub
->
unassignedVg
;
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
...
...
source/libs/executor/src/dataSinkMgt.c
浏览文件 @
523855be
...
@@ -26,7 +26,7 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
...
@@ -26,7 +26,7 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
}
}
int32_t
dsCreateDataSinker
(
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
)
{
int32_t
dsCreateDataSinker
(
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
)
{
if
(
QUERY_NODE_
DSINK
_DISPATCH
==
nodeType
(
pDataSink
))
{
if
(
QUERY_NODE_
PHYSICAL_PLAN
_DISPATCH
==
nodeType
(
pDataSink
))
{
return
createDataDispatcher
(
&
gDataSinkManager
,
pDataSink
,
pHandle
);
return
createDataDispatcher
(
&
gDataSinkManager
,
pDataSink
,
pHandle
);
}
}
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
523855be
...
@@ -90,6 +90,10 @@ static char* nodeName(ENodeType type) {
...
@@ -90,6 +90,10 @@ static char* nodeName(ENodeType type) {
return
"PhysiJoin"
;
return
"PhysiJoin"
;
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
return
"PhysiAgg"
;
return
"PhysiAgg"
;
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
"PhysiSubplan"
;
case
QUERY_NODE_PHYSICAL_PLAN
:
return
"PhysiPlan"
;
default:
default:
break
;
break
;
}
}
...
@@ -462,6 +466,164 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
...
@@ -462,6 +466,164 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
return
code
;
return
code
;
}
}
static
const
char
*
jkSubplanIdQueryId
=
"QueryId"
;
static
const
char
*
jkSubplanIdTemplateId
=
"TemplateId"
;
static
const
char
*
jkSubplanIdSubplanId
=
"SubplanId"
;
static
int32_t
subplanIdToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SSubplanId
*
pNode
=
(
const
SSubplanId
*
)
pObj
;
int32_t
code
=
tjsonAddIntegerToObject
(
pJson
,
jkSubplanIdQueryId
,
pNode
->
queryId
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkSubplanIdTemplateId
,
pNode
->
templateId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkSubplanIdSubplanId
,
pNode
->
subplanId
);
}
return
code
;
}
static
int32_t
jsonToSubplanId
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SSubplanId
*
pNode
=
(
SSubplanId
*
)
pObj
;
int32_t
code
=
tjsonGetUBigIntValue
(
pJson
,
jkSubplanIdQueryId
,
&
pNode
->
queryId
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkSubplanIdTemplateId
,
&
pNode
->
templateId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkSubplanIdSubplanId
,
&
pNode
->
subplanId
);
}
return
code
;
}
static
const
char
*
jkEndPointFqdn
=
"Fqdn"
;
static
const
char
*
jkEndPointPort
=
"Port"
;
static
int32_t
epToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SEp
*
pNode
=
(
const
SEp
*
)
pObj
;
int32_t
code
=
tjsonAddStringToObject
(
pJson
,
jkEndPointFqdn
,
pNode
->
fqdn
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkEndPointPort
,
pNode
->
port
);
}
return
code
;
}
static
int32_t
jsonToEp
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SEp
*
pNode
=
(
SEp
*
)
pObj
;
int32_t
code
=
tjsonGetStringValue
(
pJson
,
jkEndPointFqdn
,
pNode
->
fqdn
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetSmallIntValue
(
pJson
,
jkEndPointPort
,
&
pNode
->
port
);
}
return
code
;
}
static
const
char
*
jkQueryNodeAddrId
=
"Id"
;
static
const
char
*
jkQueryNodeAddrInUse
=
"InUse"
;
static
const
char
*
jkQueryNodeAddrNumOfEps
=
"NumOfEps"
;
static
const
char
*
jkQueryNodeAddrEps
=
"Eps"
;
static
int32_t
queryNodeAddrToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SQueryNodeAddr
*
pNode
=
(
const
SQueryNodeAddr
*
)
pObj
;
int32_t
code
=
tjsonAddIntegerToObject
(
pJson
,
jkQueryNodeAddrId
,
pNode
->
nodeId
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkQueryNodeAddrInUse
,
pNode
->
epset
.
inUse
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkQueryNodeAddrNumOfEps
,
pNode
->
epset
.
numOfEps
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddArray
(
pJson
,
jkQueryNodeAddrEps
,
epToJson
,
pNode
->
epset
.
eps
,
sizeof
(
SEp
),
pNode
->
epset
.
numOfEps
);
}
return
code
;
}
static
int32_t
jsonToQueryNodeAddr
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SQueryNodeAddr
*
pNode
=
(
SQueryNodeAddr
*
)
pObj
;
int32_t
code
=
tjsonGetIntValue
(
pJson
,
jkQueryNodeAddrId
,
&
pNode
->
nodeId
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkQueryNodeAddrInUse
,
&
pNode
->
epset
.
inUse
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkQueryNodeAddrNumOfEps
,
&
pNode
->
epset
.
numOfEps
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonToArray
(
pJson
,
jkQueryNodeAddrEps
,
jsonToEp
,
pNode
->
epset
.
eps
,
sizeof
(
SEp
));
}
return
code
;
}
static
const
char
*
jkSubplanId
=
"Id"
;
static
const
char
*
jkSubplanType
=
"SubplanType"
;
static
const
char
*
jkSubplanMsgType
=
"MsgType"
;
static
const
char
*
jkSubplanLevel
=
"Level"
;
static
const
char
*
jkSubplanNodeAddr
=
"NodeAddr"
;
static
const
char
*
jkSubplanRootNode
=
"RootNode"
;
static
const
char
*
jkSubplanDataSink
=
"DataSink"
;
static
int32_t
subplanToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SSubplan
*
pNode
=
(
const
SSubplan
*
)
pObj
;
int32_t
code
=
tjsonAddObject
(
pJson
,
jkSubplanId
,
subplanIdToJson
,
&
pNode
->
id
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkSubplanType
,
pNode
->
subplanType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkSubplanMsgType
,
pNode
->
msgType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkSubplanLevel
,
pNode
->
level
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkSubplanNodeAddr
,
queryNodeAddrToJson
,
&
pNode
->
execNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkSubplanRootNode
,
nodeToJson
,
pNode
->
pNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkSubplanDataSink
,
nodeToJson
,
pNode
->
pDataSink
);
}
return
code
;
}
static
int32_t
jsonToSubplan
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SSubplan
*
pNode
=
(
SSubplan
*
)
pObj
;
int32_t
code
=
tjsonToObject
(
pJson
,
jkSubplanId
,
jsonToSubplanId
,
&
pNode
->
id
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
int32_t
val
;
code
=
tjsonGetIntValue
(
pJson
,
jkSubplanType
,
&
val
);
pNode
->
subplanType
=
val
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkSubplanMsgType
,
&
pNode
->
msgType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkSubplanLevel
,
&
pNode
->
level
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonToObject
(
pJson
,
jkSubplanNodeAddr
,
jsonToQueryNodeAddr
,
&
pNode
->
execNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkSubplanRootNode
,
(
SNode
**
)
&
pNode
->
pNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkSubplanDataSink
,
(
SNode
**
)
&
pNode
->
pDataSink
);
}
return
code
;
}
static
const
char
*
jkAggLogicPlanGroupKeys
=
"GroupKeys"
;
static
const
char
*
jkAggLogicPlanGroupKeys
=
"GroupKeys"
;
static
const
char
*
jkAggLogicPlanAggFuncs
=
"AggFuncs"
;
static
const
char
*
jkAggLogicPlanAggFuncs
=
"AggFuncs"
;
...
@@ -1064,6 +1226,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
...
@@ -1064,6 +1226,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiJoinNodeToJson
(
pObj
,
pJson
);
return
physiJoinNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
return
physiAggNodeToJson
(
pObj
,
pJson
);
return
physiAggNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
subplanToJson
(
pObj
,
pJson
);
default:
default:
break
;
break
;
}
}
...
@@ -1127,13 +1291,15 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
...
@@ -1127,13 +1291,15 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiJoinNode
(
pJson
,
pObj
);
return
jsonToPhysiJoinNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
return
jsonToPhysiAggNode
(
pJson
,
pObj
);
return
jsonToPhysiAggNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
jsonToSubplan
(
pJson
,
pObj
);
default:
default:
break
;
break
;
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
const
char
*
jkNodeType
=
"Type"
;
static
const
char
*
jkNodeType
=
"
Node
Type"
;
static
const
char
*
jkNodeName
=
"Name"
;
static
const
char
*
jkNodeName
=
"Name"
;
static
int32_t
nodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
static
int32_t
nodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
523855be
...
@@ -95,6 +95,10 @@ SNode* nodesMakeNode(ENodeType type) {
...
@@ -95,6 +95,10 @@ SNode* nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SJoinPhysiNode
));
return
makeNode
(
type
,
sizeof
(
SJoinPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
case
QUERY_NODE_PHYSICAL_PLAN_AGG
:
return
makeNode
(
type
,
sizeof
(
SAggPhysiNode
));
return
makeNode
(
type
,
sizeof
(
SAggPhysiNode
));
case
QUERY_NODE_PHYSICAL_SUBPLAN
:
return
makeNode
(
type
,
sizeof
(
SSubplan
));
case
QUERY_NODE_PHYSICAL_PLAN
:
return
makeNode
(
type
,
sizeof
(
SQueryPlan
));
default:
default:
break
;
break
;
}
}
...
...
source/libs/parser/inc/astCreateFuncs.h
浏览文件 @
523855be
...
@@ -32,6 +32,11 @@ typedef struct SAstCreateContext {
...
@@ -32,6 +32,11 @@ typedef struct SAstCreateContext {
SNode
*
pRootNode
;
SNode
*
pRootNode
;
}
SAstCreateContext
;
}
SAstCreateContext
;
typedef
struct
STokenPair
{
SToken
first
;
SToken
second
;
}
STokenPair
;
extern
SToken
nil_token
;
extern
SToken
nil_token
;
void
initAstCreateContext
(
SParseContext
*
pParseCxt
,
SAstCreateContext
*
pCxt
);
void
initAstCreateContext
(
SParseContext
*
pParseCxt
,
SAstCreateContext
*
pCxt
);
...
@@ -76,8 +81,6 @@ SNode* addLimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit);
...
@@ -76,8 +81,6 @@ SNode* addLimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit);
SNode
*
createSelectStmt
(
SAstCreateContext
*
pCxt
,
bool
isDistinct
,
SNodeList
*
pProjectionList
,
SNode
*
pTable
);
SNode
*
createSelectStmt
(
SAstCreateContext
*
pCxt
,
bool
isDistinct
,
SNodeList
*
pProjectionList
,
SNode
*
pTable
);
SNode
*
createSetOperator
(
SAstCreateContext
*
pCxt
,
ESetOperatorType
type
,
SNode
*
pLeft
,
SNode
*
pRight
);
SNode
*
createSetOperator
(
SAstCreateContext
*
pCxt
,
ESetOperatorType
type
,
SNode
*
pLeft
,
SNode
*
pRight
);
SDatabaseOptions
*
createDefaultDatabaseOptions
(
SAstCreateContext
*
pCxt
);
typedef
enum
EDatabaseOptionType
{
typedef
enum
EDatabaseOptionType
{
DB_OPTION_BLOCKS
=
0
,
DB_OPTION_BLOCKS
=
0
,
DB_OPTION_CACHE
,
DB_OPTION_CACHE
,
...
@@ -99,11 +102,21 @@ typedef enum EDatabaseOptionType {
...
@@ -99,11 +102,21 @@ typedef enum EDatabaseOptionType {
DB_OPTION_MAX
DB_OPTION_MAX
}
EDatabaseOptionType
;
}
EDatabaseOptionType
;
SDatabaseOptions
*
createDefaultDatabaseOptions
(
SAstCreateContext
*
pCxt
);
SDatabaseOptions
*
setDatabaseOption
(
SAstCreateContext
*
pCxt
,
SDatabaseOptions
*
pOptions
,
EDatabaseOptionType
type
,
const
SToken
*
pVal
);
SDatabaseOptions
*
setDatabaseOption
(
SAstCreateContext
*
pCxt
,
SDatabaseOptions
*
pOptions
,
EDatabaseOptionType
type
,
const
SToken
*
pVal
);
SNode
*
createCreateDatabaseStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pDbName
,
SDatabaseOptions
*
pOptions
);
SNode
*
createCreateDatabaseStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pDbName
,
SDatabaseOptions
*
pOptions
);
typedef
enum
ETableOptionType
{
TABLE_OPTION_KEEP
=
0
,
TABLE_OPTION_TTL
,
TABLE_OPTION_COMMENT
,
TABLE_OPTION_MAX
}
ETableOptionType
;
STableOptions
*
createDefaultTableOptions
(
SAstCreateContext
*
pCxt
);
STableOptions
*
setTableOption
(
SAstCreateContext
*
pCxt
,
STableOptions
*
pOptions
,
ETableOptionType
type
,
const
SToken
*
pVal
);
SNode
*
createColumnDefNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pColName
,
SDataType
dataType
,
const
SToken
*
pComment
);
SNode
*
createCreateTableStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
STokenPair
*
pFullTableName
,
SNodeList
*
pCols
,
STableOptions
*
pOptions
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/parser/inc/new_sql.y
浏览文件 @
523855be
...
@@ -65,7 +65,7 @@
...
@@ -65,7 +65,7 @@
//%right NK_BITNOT.
//%right NK_BITNOT.
/************************************************ create database *****************************************************/
/************************************************ create database *****************************************************/
cmd ::= CREATE DATABASE exists_opt(A) db_name(B) db_options(C). {
PARSER_TRACE;
pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C);}
cmd ::= CREATE DATABASE exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C);}
%type exists_opt { bool }
%type exists_opt { bool }
exists_opt(A) ::= IF NOT EXISTS. { A = true; }
exists_opt(A) ::= IF NOT EXISTS. { A = true; }
...
@@ -73,7 +73,7 @@ exists_opt(A) ::= .
...
@@ -73,7 +73,7 @@ exists_opt(A) ::= .
%type db_options { SDatabaseOptions* }
%type db_options { SDatabaseOptions* }
%destructor db_options { tfree($$); }
%destructor db_options { tfree($$); }
db_options(A) ::= . { A = createDefaultDatabaseOptions(pCxt);}
db_options(A) ::= . { A = createDefaultDatabaseOptions(pCxt);
}
db_options(A) ::= db_options(B) BLOCKS NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_BLOCKS, &C); }
db_options(A) ::= db_options(B) BLOCKS NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_BLOCKS, &C); }
db_options(A) ::= db_options(B) CACHE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHE, &C); }
db_options(A) ::= db_options(B) CACHE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHE, &C); }
db_options(A) ::= db_options(B) CACHELAST NK_INTEGER(X)(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHELAST, &C); }
db_options(A) ::= db_options(B) CACHELAST NK_INTEGER(X)(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHELAST, &C); }
...
@@ -92,6 +92,55 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
...
@@ -92,6 +92,55 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
db_options(A) ::= db_options(B) SINGLESTABLE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SINGLESTABLE, &C); }
db_options(A) ::= db_options(B) SINGLESTABLE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SINGLESTABLE, &C); }
db_options(A) ::= db_options(B) STREAMMODE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_STREAMMODE, &C); }
db_options(A) ::= db_options(B) STREAMMODE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_STREAMMODE, &C); }
/************************************************ create table *******************************************************/
cmd ::= CREATE TABLE exists_opt(A) full_table_name(B)
NK_LP column_def_list(C) NK_RP table_options(D). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, &B, C, D);}
%type full_table_name { STokenPair }
%destructor full_table_name { }
full_table_name(A) ::= NK_ID(B). { A = { .first = B, .second = nil_token}; }
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { A = { .first = B, .second = C}; }
%type column_def_list { SNodeList* }
%destructor column_def_list { nodesDestroyList($$); }
column_def_list(A) ::= column_def(B). { A = createNodeList(pCxt, B); }
column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, B, C, NULL); }
column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, B, C, &D); }
%type type_name { SDataType }
%destructor type_name { }
type_name(A) ::= BOOL. { A = createDataType(TSDB_DATA_TYPE_BOOL); }
type_name(A) ::= TINYINT. { A = createDataType(TSDB_DATA_TYPE_TINYINT); }
type_name(A) ::= SMALLINT. { A = createDataType(TSDB_DATA_TYPE_SMALLINT); }
type_name(A) ::= INT. { A = createDataType(TSDB_DATA_TYPE_INT); }
type_name(A) ::= BIGINT. { A = createDataType(TSDB_DATA_TYPE_BIGINT); }
type_name(A) ::= FLOAT. { A = createDataType(TSDB_DATA_TYPE_FLOAT); }
type_name(A) ::= DOUBLE. { A = createDataType(TSDB_DATA_TYPE_DOUBLE); }
type_name(A) ::= BINARY NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_BINARY, &B); }
type_name(A) ::= TIMESTAMP. { A = createDataType(TSDB_DATA_TYPE_TIMESTAMP); }
type_name(A) ::= NCHAR NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_NCHAR, &B); }
type_name(A) ::= TINYINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UTINYINT); }
type_name(A) ::= SMALLINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_USMALLINT); }
type_name(A) ::= INT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UINT); }
type_name(A) ::= BIGINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UBIGINT); }
type_name(A) ::= JSON. { A = createDataType(TSDB_DATA_TYPE_JSON); }
type_name(A) ::= VARCHAR NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_VARCHAR, &B); }
type_name(A) ::= MEDIUMBLOB. { A = createDataType(TSDB_DATA_TYPE_MEDIUMBLOB); }
type_name(A) ::= BLOB. { A = createDataType(TSDB_DATA_TYPE_BLOB); }
type_name(A) ::= VARBINARY NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_VARBINARY, &B); }
type_name(A) ::= DECIMAL. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_COMMA NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
%type table_options { SDatabaseOptions* }
%destructor table_options { tfree($$); }
table_options(A) ::= . { A = createDefaultTableOptions(pCxt);}
table_options(A) ::= table_options(B) COMMENT NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_COMMENT, &C); }
table_options(A) ::= table_options(B) KEEP NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_KEEP, &C); }
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
//cmd ::= SHOW DATABASES. { PARSER_TRACE; createShowStmt(pCxt, SHOW_TYPE_DATABASE); }
//cmd ::= SHOW DATABASES. { PARSER_TRACE; createShowStmt(pCxt, SHOW_TYPE_DATABASE); }
/************************************************ select *************************************************************/
/************************************************ select *************************************************************/
...
...
source/libs/parser/src/astCreateFuncs.c
浏览文件 @
523855be
...
@@ -35,9 +35,11 @@
...
@@ -35,9 +35,11 @@
SToken
nil_token
=
{
.
type
=
TK_NIL
,
.
n
=
0
,
.
z
=
NULL
};
SToken
nil_token
=
{
.
type
=
TK_NIL
,
.
n
=
0
,
.
z
=
NULL
};
typedef
SDatabaseOptions
*
(
*
FSetDatabaseOption
)(
SAstCreateContext
*
pCxt
,
SDatabaseOptions
*
pOptions
,
const
SToken
*
pVal
);
typedef
SDatabaseOptions
*
(
*
FSetDatabaseOption
)(
SAstCreateContext
*
pCxt
,
SDatabaseOptions
*
pOptions
,
const
SToken
*
pVal
);
static
FSetDatabaseOption
setDbOptionFuncs
[
DB_OPTION_MAX
];
static
FSetDatabaseOption
setDbOptionFuncs
[
DB_OPTION_MAX
];
typedef
STableOptions
*
(
*
FSetTableOption
)(
SAstCreateContext
*
pCxt
,
STableOptions
*
pOptions
,
const
SToken
*
pVal
);
static
FSetTableOption
setTableOptionFuncs
[
TABLE_OPTION_MAX
];
static
SDatabaseOptions
*
setDbBlocks
(
SAstCreateContext
*
pCxt
,
SDatabaseOptions
*
pOptions
,
const
SToken
*
pVal
)
{
static
SDatabaseOptions
*
setDbBlocks
(
SAstCreateContext
*
pCxt
,
SDatabaseOptions
*
pOptions
,
const
SToken
*
pVal
)
{
int64_t
val
=
strtol
(
pVal
->
z
,
NULL
,
10
);
int64_t
val
=
strtol
(
pVal
->
z
,
NULL
,
10
);
if
(
val
<
TSDB_MIN_TOTAL_BLOCKS
||
val
>
TSDB_MAX_TOTAL_BLOCKS
)
{
if
(
val
<
TSDB_MIN_TOTAL_BLOCKS
||
val
>
TSDB_MAX_TOTAL_BLOCKS
)
{
...
@@ -263,12 +265,54 @@ static void initSetDatabaseOptionFp() {
...
@@ -263,12 +265,54 @@ static void initSetDatabaseOptionFp() {
setDbOptionFuncs
[
DB_OPTION_STREAMMODE
]
=
setDbStreamMode
;
setDbOptionFuncs
[
DB_OPTION_STREAMMODE
]
=
setDbStreamMode
;
}
}
static
STableOptions
*
setTableKeep
(
SAstCreateContext
*
pCxt
,
STableOptions
*
pOptions
,
const
SToken
*
pVal
)
{
int64_t
val
=
strtol
(
pVal
->
z
,
NULL
,
10
);
if
(
val
<
TSDB_MIN_KEEP
||
val
>
TSDB_MAX_KEEP
)
{
snprintf
(
pCxt
->
pQueryCxt
->
pMsg
,
pCxt
->
pQueryCxt
->
msgLen
,
"invalid table option keep: %d valid range: [%d, %d]"
,
val
,
TSDB_MIN_KEEP
,
TSDB_MAX_KEEP
);
pCxt
->
valid
=
false
;
return
pOptions
;
}
pOptions
->
keep
=
val
;
return
pOptions
;
}
static
STableOptions
*
setTableTtl
(
SAstCreateContext
*
pCxt
,
STableOptions
*
pOptions
,
const
SToken
*
pVal
)
{
int64_t
val
=
strtol
(
pVal
->
z
,
NULL
,
10
);
if
(
val
<
TSDB_MIN_DB_TTL_OPTION
)
{
snprintf
(
pCxt
->
pQueryCxt
->
pMsg
,
pCxt
->
pQueryCxt
->
msgLen
,
"invalid table option ttl: %d, should be greater than or equal to %d"
,
val
,
TSDB_MIN_DB_TTL_OPTION
);
pCxt
->
valid
=
false
;
return
pOptions
;
}
pOptions
->
ttl
=
val
;
return
pOptions
;
}
static
STableOptions
*
setTableComment
(
SAstCreateContext
*
pCxt
,
STableOptions
*
pOptions
,
const
SToken
*
pVal
)
{
if
(
pVal
->
n
>=
sizeof
(
pOptions
->
comments
))
{
snprintf
(
pCxt
->
pQueryCxt
->
pMsg
,
pCxt
->
pQueryCxt
->
msgLen
,
"invalid table option comment, length cannot exceed %d"
,
sizeof
(
pOptions
->
comments
)
-
1
);
pCxt
->
valid
=
false
;
return
pOptions
;
}
strncpy
(
pOptions
->
comments
,
pVal
->
z
,
pVal
->
n
);
return
pOptions
;
}
static
void
initSetTableOptionFp
()
{
setTableOptionFuncs
[
TABLE_OPTION_KEEP
]
=
setTableKeep
;
setTableOptionFuncs
[
TABLE_OPTION_TTL
]
=
setTableTtl
;
setTableOptionFuncs
[
TABLE_OPTION_COMMENT
]
=
setTableComment
;
}
void
initAstCreateContext
(
SParseContext
*
pParseCxt
,
SAstCreateContext
*
pCxt
)
{
void
initAstCreateContext
(
SParseContext
*
pParseCxt
,
SAstCreateContext
*
pCxt
)
{
pCxt
->
pQueryCxt
=
pParseCxt
;
pCxt
->
pQueryCxt
=
pParseCxt
;
pCxt
->
notSupport
=
false
;
pCxt
->
notSupport
=
false
;
pCxt
->
valid
=
true
;
pCxt
->
valid
=
true
;
pCxt
->
pRootNode
=
NULL
;
pCxt
->
pRootNode
=
NULL
;
initSetDatabaseOptionFp
();
initSetDatabaseOptionFp
();
initSetTableOptionFp
();
}
}
static
bool
checkDbName
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pDbName
)
{
static
bool
checkDbName
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pDbName
)
{
...
@@ -651,3 +695,40 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons
...
@@ -651,3 +695,40 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons
pStmt
->
options
=
*
pOptions
;
pStmt
->
options
=
*
pOptions
;
return
(
SNode
*
)
pStmt
;
return
(
SNode
*
)
pStmt
;
}
}
STableOptions
*
createDefaultTableOptions
(
SAstCreateContext
*
pCxt
)
{
STableOptions
*
pOptions
=
calloc
(
1
,
sizeof
(
STableOptions
));
CHECK_OUT_OF_MEM
(
pOptions
);
pOptions
->
keep
=
TSDB_DEFAULT_KEEP
;
pOptions
->
ttl
=
TSDB_DEFAULT_DB_TTL_OPTION
;
return
pOptions
;
}
STableOptions
*
setTableOption
(
SAstCreateContext
*
pCxt
,
STableOptions
*
pOptions
,
ETableOptionType
type
,
const
SToken
*
pVal
)
{
return
setTableOptionFuncs
[
type
](
pCxt
,
pOptions
,
pVal
);
}
SNode
*
createColumnDefNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pColName
,
SDataType
dataType
,
const
SToken
*
pComment
)
{
SColumnDefNode
*
pCol
=
(
SColumnDefNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN_DEF
);
CHECK_OUT_OF_MEM
(
pCol
);
strncpy
(
pCol
->
colName
,
pColName
->
z
,
pColName
->
n
);
pCol
->
dataType
=
dataType
;
if
(
NULL
!=
pComment
)
{
strncpy
(
pCol
->
colName
,
pColName
->
z
,
pColName
->
n
);
}
return
(
SNode
*
)
pCol
;
}
SNode
*
createCreateTableStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
STokenPair
*
pFullTableName
,
SNodeList
*
pCols
,
STableOptions
*
pOptions
)
{
SCreateTableStmt
*
pStmt
=
(
SCreateTableStmt
*
)
nodesMakeNode
(
QUERY_NODE_CREATE_TABLE_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
if
(
TK_NIL
!=
pFullTableName
->
first
.
type
)
{
strncpy
(
pStmt
->
dbName
,
pFullTableName
->
first
.
z
,
pFullTableName
->
first
.
n
);
}
strncpy
(
pStmt
->
tableName
,
pFullTableName
->
second
.
z
,
pFullTableName
->
second
.
n
);
pStmt
->
ignoreExists
=
ignoreExists
;
pStmt
->
pCols
=
pCols
;
pStmt
->
options
=
*
pOptions
;
return
(
SNode
*
)
pStmt
;
}
source/libs/parser/src/astParse.c
浏览文件 @
523855be
...
@@ -32,6 +32,7 @@ static bool isCmd(const SNode* pRootNode) {
...
@@ -32,6 +32,7 @@ static bool isCmd(const SNode* pRootNode) {
}
}
switch
(
nodeType
(
pRootNode
))
{
switch
(
nodeType
(
pRootNode
))
{
case
QUERY_NODE_SELECT_STMT
:
case
QUERY_NODE_SELECT_STMT
:
case
QUERY_NODE_CREATE_TABLE_STMT
:
return
false
;
return
false
;
default:
default:
break
;
break
;
...
@@ -74,7 +75,7 @@ int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
...
@@ -74,7 +75,7 @@ int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
}
}
default:
default:
NewParse
(
pParser
,
t0
.
type
,
t0
,
&
cxt
);
NewParse
(
pParser
,
t0
.
type
,
t0
,
&
cxt
);
NewParseTrace
(
stdout
,
""
);
//
NewParseTrace(stdout, "");
if
(
!
cxt
.
valid
)
{
if
(
!
cxt
.
valid
)
{
goto
abort_parse
;
goto
abort_parse
;
}
}
...
...
source/libs/parser/src/astTranslate.c
浏览文件 @
523855be
...
@@ -829,6 +829,10 @@ static int32_t translateCreateDatabase(STranslateContext* pCxt, SCreateDatabaseS
...
@@ -829,6 +829,10 @@ static int32_t translateCreateDatabase(STranslateContext* pCxt, SCreateDatabaseS
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
translateCreateTable
(
STranslateContext
*
pCxt
,
SCreateTableStmt
*
pStmt
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateQuery
(
STranslateContext
*
pCxt
,
SNode
*
pNode
)
{
static
int32_t
translateQuery
(
STranslateContext
*
pCxt
,
SNode
*
pNode
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
nodeType
(
pNode
))
{
switch
(
nodeType
(
pNode
))
{
...
@@ -838,6 +842,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
...
@@ -838,6 +842,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case
QUERY_NODE_CREATE_DATABASE_STMT
:
case
QUERY_NODE_CREATE_DATABASE_STMT
:
code
=
translateCreateDatabase
(
pCxt
,
(
SCreateDatabaseStmt
*
)
pNode
);
code
=
translateCreateDatabase
(
pCxt
,
(
SCreateDatabaseStmt
*
)
pNode
);
break
;
break
;
case
QUERY_NODE_CREATE_TABLE_STMT
:
code
=
translateCreateTable
(
pCxt
,
(
SCreateTableStmt
*
)
pNode
);
break
;
default:
default:
break
;
break
;
}
}
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
523855be
...
@@ -41,8 +41,7 @@ extern "C" {
...
@@ -41,8 +41,7 @@ extern "C" {
int32_t
createLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
**
pLogicNode
);
int32_t
createLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
**
pLogicNode
);
int32_t
optimize
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
);
int32_t
optimize
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
);
int32_t
createPhysiPlan
(
SLogicNode
*
pLogicNode
,
SPhysiNode
**
pPhyNode
);
int32_t
createPhysiPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryPlan
**
pPlan
);
int32_t
buildPhysiPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryPlan
**
pPlan
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
523855be
...
@@ -497,23 +497,13 @@ void qDestroySubplan(SSubplan* pSubplan) {
...
@@ -497,23 +497,13 @@ void qDestroySubplan(SSubplan* pSubplan) {
#include "functionMgt.h"
#include "functionMgt.h"
typedef
struct
SSubLogicPlan
{
SNode
*
pRoot
;
// SLogicNode
bool
haveSuperTable
;
bool
haveSystemTable
;
}
SSubLogicPlan
;
int32_t
splitLogicPlan
(
SSubLogicPlan
*
pLogicPlan
)
{
// todo
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SSlotIndex
{
typedef
struct
SSlotIndex
{
int16_t
dataBlockId
;
int16_t
dataBlockId
;
int16_t
slotId
;
int16_t
slotId
;
}
SSlotIndex
;
}
SSlotIndex
;
typedef
struct
SPhysiPlanContext
{
typedef
struct
SPhysiPlanContext
{
SPlanContext
*
pPlanCxt
;
int32_t
errCode
;
int32_t
errCode
;
int16_t
nextDataBlockId
;
int16_t
nextDataBlockId
;
SArray
*
pLocationHelper
;
SArray
*
pLocationHelper
;
...
@@ -956,19 +946,94 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
...
@@ -956,19 +946,94 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
return
pPhyNode
;
return
pPhyNode
;
}
}
int32_t
createPhysiPlan
(
SLogicNode
*
pLogicNode
,
SPhysiNode
**
pPhyNode
)
{
static
SSubplan
*
createPhysiSubplan
(
SPhysiPlanContext
*
pCxt
,
SSubLogicPlan
*
pLogicSubplan
)
{
SPhysiPlanContext
cxt
=
{
.
errCode
=
TSDB_CODE_SUCCESS
,
.
nextDataBlockId
=
0
,
.
pLocationHelper
=
taosArrayInit
(
32
,
POINTER_BYTES
)
};
SSubplan
*
pSubplan
=
(
SSubplan
*
)
nodesMakeNode
(
QUERY_NODE_PHYSICAL_SUBPLAN
);
if
(
NULL
==
cxt
.
pLocationHelper
)
{
pSubplan
->
pNode
=
createPhysiNode
(
pCxt
,
pLogicSubplan
->
pNode
);
return
TSDB_CODE_OUT_OF_MEMORY
;
return
pSubplan
;
}
static
SQueryLogicPlan
*
createRawQueryLogicPlan
(
SPhysiPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
SQueryLogicPlan
*
pLogicPlan
=
(
SQueryLogicPlan
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN
);
CHECK_ALLOC
(
pLogicPlan
,
NULL
);
pLogicPlan
->
pSubplans
=
nodesMakeList
();
CHECK_ALLOC
(
pLogicPlan
->
pSubplans
,
pLogicPlan
);
SNodeListNode
*
pTopSubplans
=
(
SNodeListNode
*
)
nodesMakeNode
(
QUERY_NODE_NODE_LIST
);
CHECK_ALLOC
(
pTopSubplans
,
pLogicPlan
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pLogicPlan
->
pSubplans
,
(
SNode
*
)
pTopSubplans
))
{
nodesDestroyNode
((
SNode
*
)
pTopSubplans
);
return
pLogicPlan
;
}
}
*
pPhyNode
=
createPhysiNode
(
&
cxt
,
pLogicNode
);
pTopSubplans
->
pNodeList
=
nodesMakeList
();
return
cxt
.
errCode
;
CHECK_ALLOC
(
pTopSubplans
->
pNodeList
,
pLogicPlan
);
SSubLogicPlan
*
pSubplan
=
(
SSubLogicPlan
*
)
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
CHECK_ALLOC
(
pSubplan
,
pLogicPlan
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pTopSubplans
->
pNodeList
,
(
SNode
*
)
pSubplan
))
{
nodesDestroyNode
((
SNode
*
)
pSubplan
);
return
pLogicPlan
;
}
pSubplan
->
pNode
=
(
SLogicNode
*
)
nodesCloneNode
((
SNode
*
)
pLogicNode
);
CHECK_ALLOC
(
pSubplan
->
pNode
,
pLogicPlan
);
return
pLogicPlan
;
}
static
int32_t
splitLogicPlan
(
SPhysiPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryLogicPlan
**
pLogicPlan
)
{
SQueryLogicPlan
*
pPlan
=
createRawQueryLogicPlan
(
pCxt
,
pLogicNode
);
if
(
TSDB_CODE_SUCCESS
!=
pCxt
->
errCode
)
{
nodesDestroyNode
((
SNode
*
)
pPlan
);
return
pCxt
->
errCode
;
}
// todo split
*
pLogicPlan
=
pPlan
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
buildPhysiPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryPlan
**
pPlan
)
{
static
int32_t
buildPhysiPlan
(
SPhysiPlanContext
*
pCxt
,
SQueryLogicPlan
*
pLogicPlan
,
SQueryPlan
**
pPlan
)
{
// split
SQueryPlan
*
pQueryPlan
=
(
SQueryPlan
*
)
nodesMakeNode
(
QUERY_NODE_PHYSICAL_PLAN
);
// scale out
CHECK_ALLOC
(
pQueryPlan
,
TSDB_CODE_OUT_OF_MEMORY
);
// maping
*
pPlan
=
pQueryPlan
;
// create
pQueryPlan
->
queryId
=
pCxt
->
pPlanCxt
->
queryId
;
pQueryPlan
->
pSubplans
=
nodesMakeList
();
CHECK_ALLOC
(
pQueryPlan
->
pSubplans
,
TSDB_CODE_OUT_OF_MEMORY
);
SNode
*
pNode
;
FOREACH
(
pNode
,
pLogicPlan
->
pSubplans
)
{
SNodeListNode
*
pLevelSubplans
=
(
SNodeListNode
*
)
nodesMakeNode
(
QUERY_NODE_NODE_LIST
);
CHECK_ALLOC
(
pLevelSubplans
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pQueryPlan
->
pSubplans
,
(
SNode
*
)
pLevelSubplans
))
{
nodesDestroyNode
((
SNode
*
)
pLevelSubplans
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SNode
*
pLogicSubplan
;
FOREACH
(
pLogicSubplan
,
((
SNodeListNode
*
)
pNode
)
->
pNodeList
)
{
SSubplan
*
pSubplan
=
createPhysiSubplan
(
pCxt
,
(
SSubLogicPlan
*
)
pLogicSubplan
);
CHECK_ALLOC
(
pSubplan
,
TSDB_CODE_OUT_OF_MEMORY
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pLevelSubplans
->
pNodeList
,
(
SNode
*
)
pSubplan
))
{
nodesDestroyNode
((
SNode
*
)
pSubplan
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
++
(
pQueryPlan
->
numOfSubplans
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
createPhysiPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SQueryPlan
**
pPlan
)
{
SPhysiPlanContext
cxt
=
{
.
pPlanCxt
=
pCxt
,
.
errCode
=
TSDB_CODE_SUCCESS
,
.
nextDataBlockId
=
0
,
.
pLocationHelper
=
taosArrayInit
(
32
,
POINTER_BYTES
)
};
if
(
NULL
==
cxt
.
pLocationHelper
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SQueryLogicPlan
*
pLogicPlan
;
int32_t
code
=
splitLogicPlan
(
&
cxt
,
pLogicNode
,
&
pLogicPlan
);
// todo scale out
// todo maping
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildPhysiPlan
(
&
cxt
,
pLogicPlan
,
pPlan
);
}
nodesDestroyNode
((
SNode
*
)
pLogicPlan
);
return
code
;
}
source/libs/planner/src/planner.c
浏览文件 @
523855be
...
@@ -28,7 +28,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan) {
...
@@ -28,7 +28,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan) {
code
=
optimize
(
pCxt
,
pLogicNode
);
code
=
optimize
(
pCxt
,
pLogicNode
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
build
PhysiPlan
(
pCxt
,
pLogicNode
,
pPlan
);
code
=
create
PhysiPlan
(
pCxt
,
pLogicNode
,
pPlan
);
}
}
return
code
;
return
code
;
}
}
...
@@ -38,11 +38,11 @@ void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstrea
...
@@ -38,11 +38,11 @@ void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstrea
}
}
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
,
int32_t
*
len
)
{
int32_t
qSubPlanToString
(
const
SSubplan
*
subplan
,
char
**
str
,
int32_t
*
len
)
{
return
nodesNodeToString
((
const
SNode
*
)
subplan
,
false
,
str
,
len
);
}
}
int32_t
qStringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
)
{
int32_t
qStringToSubplan
(
const
char
*
str
,
SSubplan
**
subplan
)
{
return
nodesStringToNode
(
str
,
(
SNode
**
)
subplan
);
}
}
char
*
qQueryPlanToString
(
const
SQueryPlan
*
pPlan
)
{
char
*
qQueryPlanToString
(
const
SQueryPlan
*
pPlan
)
{
...
...
source/libs/planner/test/plannerTest.cpp
浏览文件 @
523855be
...
@@ -70,14 +70,14 @@ protected:
...
@@ -70,14 +70,14 @@ protected:
cout
<<
toString
((
const
SNode
*
)
pLogicPlan
,
false
)
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pLogicPlan
,
false
)
<<
endl
;
if
(
TEST_PHYSICAL_PLAN
==
target
)
{
if
(
TEST_PHYSICAL_PLAN
==
target
)
{
S
PhysiNode
*
pPhy
Plan
=
nullptr
;
S
QueryPlan
*
p
Plan
=
nullptr
;
code
=
createPhysiPlan
(
pLogicPlan
,
&
pPhy
Plan
);
code
=
createPhysiPlan
(
&
cxt
,
pLogicPlan
,
&
p
Plan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] physical plan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] physical plan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
return
false
;
}
}
cout
<<
"unformatted physical plan : "
<<
endl
;
cout
<<
"unformatted physical plan : "
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pP
hyP
lan
,
false
)
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pPlan
,
false
)
<<
endl
;
}
}
return
true
;
return
true
;
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
523855be
...
@@ -111,7 +111,7 @@ typedef struct SSchJob {
...
@@ -111,7 +111,7 @@ typedef struct SSchJob {
void
*
transport
;
void
*
transport
;
SArray
*
nodeList
;
// qnode/vnode list, element is SQueryNodeAddr
SArray
*
nodeList
;
// qnode/vnode list, element is SQueryNodeAddr
SArray
*
levels
;
// Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray
*
levels
;
// Element is SQueryLevel, starting from 0. SArray<SSchLevel>
S
Array
*
subPlans
;
// subplan pointer copied from DAG, no need to free it in scheduler
S
NodeList
*
subPlans
;
// subplan pointer copied from DAG, no need to free it in scheduler
int32_t
levelIdx
;
int32_t
levelIdx
;
SEpSet
dataSrcEps
;
SEpSet
dataSrcEps
;
...
@@ -135,9 +135,9 @@ typedef struct SSchJob {
...
@@ -135,9 +135,9 @@ typedef struct SSchJob {
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->
type == QUERY
_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->
subplanType == SUBPLAN
_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->
type == QUERY
_TYPE_MODIFY)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->
subplanType == SUBPLAN
_TYPE_MODIFY)
#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->
type == QUERY
_TYPE_MODIFY)
#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->
subplanType == SUBPLAN
_TYPE_MODIFY)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
...
@@ -145,7 +145,7 @@ typedef struct SSchJob {
...
@@ -145,7 +145,7 @@ typedef struct SSchJob {
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) !=
QUERY
_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) !=
SUBPLAN
_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
523855be
...
@@ -203,8 +203,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
...
@@ -203,8 +203,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for
(
int32_t
m
=
0
;
m
<
pLevel
->
taskNum
;
++
m
)
{
for
(
int32_t
m
=
0
;
m
<
pLevel
->
taskNum
;
++
m
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
m
);
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
m
);
SSubplan
*
pPlan
=
pTask
->
plan
;
SSubplan
*
pPlan
=
pTask
->
plan
;
int32_t
childNum
=
pPlan
->
pChildren
?
(
int32_t
)
taosArrayGetSize
(
pPlan
->
pChildren
)
:
0
;
int32_t
childNum
=
pPlan
->
pChildren
?
(
int32_t
)
LIST_LENGTH
(
pPlan
->
pChildren
)
:
0
;
int32_t
parentNum
=
pPlan
->
pParents
?
(
int32_t
)
taosArrayGetSize
(
pPlan
->
pParents
)
:
0
;
int32_t
parentNum
=
pPlan
->
pParents
?
(
int32_t
)
LIST_LENGTH
(
pPlan
->
pParents
)
:
0
;
if
(
childNum
>
0
)
{
if
(
childNum
>
0
)
{
if
(
pJob
->
levelIdx
==
pLevel
->
level
)
{
if
(
pJob
->
levelIdx
==
pLevel
->
level
)
{
...
@@ -220,8 +220,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
...
@@ -220,8 +220,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
}
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
childNum
;
++
n
)
{
SSubplan
*
*
child
=
taosArrayGet
(
pPlan
->
pChildren
,
n
);
SSubplan
*
child
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pChildren
,
n
);
SSchTask
**
childTask
=
taosHashGet
(
planToTask
,
child
,
POINTER_BYTES
);
SSchTask
**
childTask
=
taosHashGet
(
planToTask
,
&
child
,
POINTER_BYTES
);
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
if
(
NULL
==
childTask
||
NULL
==
*
childTask
)
{
SCH_TASK_ELOG
(
"subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_TASK_ELOG
(
"subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
...
@@ -252,8 +252,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
...
@@ -252,8 +252,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
}
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
parentNum
;
++
n
)
{
SSubplan
*
*
parent
=
taosArrayGet
(
pPlan
->
pParents
,
n
);
SSubplan
*
parent
=
(
SSubplan
*
)
nodesListGetNode
(
pPlan
->
pParents
,
n
);
SSchTask
**
parentTask
=
taosHashGet
(
planToTask
,
parent
,
POINTER_BYTES
);
SSchTask
**
parentTask
=
taosHashGet
(
planToTask
,
&
parent
,
POINTER_BYTES
);
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
if
(
NULL
==
parentTask
||
NULL
==
*
parentTask
)
{
SCH_TASK_ELOG
(
"subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_TASK_ELOG
(
"subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
...
@@ -312,7 +312,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
...
@@ -312,7 +312,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
int32_t
levelNum
=
(
int32_t
)
taosArrayGetSize
(
pDag
->
pSubplans
);
int32_t
levelNum
=
(
int32_t
)
LIST_LENGTH
(
pDag
->
pSubplans
);
if
(
levelNum
<=
0
)
{
if
(
levelNum
<=
0
)
{
SCH_JOB_ELOG
(
"invalid level num:%d"
,
levelNum
);
SCH_JOB_ELOG
(
"invalid level num:%d"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
@@ -336,7 +336,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
...
@@ -336,7 +336,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pJob
->
subPlans
=
pDag
->
pSubplans
;
pJob
->
subPlans
=
pDag
->
pSubplans
;
SSchLevel
level
=
{
0
};
SSchLevel
level
=
{
0
};
S
Array
*
plans
=
NULL
;
S
NodeListNode
*
plans
=
NULL
;
int32_t
taskNum
=
0
;
int32_t
taskNum
=
0
;
SSchLevel
*
pLevel
=
NULL
;
SSchLevel
*
pLevel
=
NULL
;
...
@@ -351,13 +351,13 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
...
@@ -351,13 +351,13 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
pLevel
->
level
=
i
;
pLevel
->
level
=
i
;
plans
=
taosArrayGetP
(
pDag
->
pSubplans
,
i
);
plans
=
(
SNodeListNode
*
)
nodesListGetNode
(
pDag
->
pSubplans
,
i
);
if
(
NULL
==
plans
)
{
if
(
NULL
==
plans
)
{
SCH_JOB_ELOG
(
"empty level plan, level:%d"
,
i
);
SCH_JOB_ELOG
(
"empty level plan, level:%d"
,
i
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
taskNum
=
(
int32_t
)
taosArrayGetSize
(
plans
);
taskNum
=
(
int32_t
)
LIST_LENGTH
(
plans
->
pNodeList
);
if
(
taskNum
<=
0
)
{
if
(
taskNum
<=
0
)
{
SCH_JOB_ELOG
(
"invalid level plan number:%d, level:%d"
,
taskNum
,
i
);
SCH_JOB_ELOG
(
"invalid level plan number:%d, level:%d"
,
taskNum
,
i
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
@@ -372,9 +372,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
...
@@ -372,9 +372,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
}
}
for
(
int32_t
n
=
0
;
n
<
taskNum
;
++
n
)
{
for
(
int32_t
n
=
0
;
n
<
taskNum
;
++
n
)
{
SSubplan
*
plan
=
taosArrayGetP
(
plans
,
n
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
plans
->
pNodeList
,
n
);
SCH_SET_JOB_TYPE
(
&
pJob
->
attr
,
plan
->
t
ype
);
SCH_SET_JOB_TYPE
(
&
pJob
->
attr
,
plan
->
subplanT
ype
);
SSchTask
task
=
{
0
};
SSchTask
task
=
{
0
};
SSchTask
*
pTask
=
&
task
;
SSchTask
*
pTask
=
&
task
;
...
@@ -1420,18 +1420,18 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD
...
@@ -1420,18 +1420,18 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD
}
}
int32_t
schedulerConvertDagToTaskList
(
SQueryPlan
*
pDag
,
SArray
**
pTasks
)
{
int32_t
schedulerConvertDagToTaskList
(
SQueryPlan
*
pDag
,
SArray
**
pTasks
)
{
if
(
NULL
==
pDag
||
pDag
->
numOfSubplans
<=
0
||
taosArrayGetSize
(
pDag
->
pSubplans
)
==
0
)
{
if
(
NULL
==
pDag
||
pDag
->
numOfSubplans
<=
0
||
LIST_LENGTH
(
pDag
->
pSubplans
)
==
0
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
int32_t
levelNum
=
taosArrayGetSize
(
pDag
->
pSubplans
);
int32_t
levelNum
=
LIST_LENGTH
(
pDag
->
pSubplans
);
if
(
1
!=
levelNum
)
{
if
(
1
!=
levelNum
)
{
qError
(
"invalid level num: %d"
,
levelNum
);
qError
(
"invalid level num: %d"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
S
Array
*
plans
=
taosArrayGet
(
pDag
->
pSubplans
,
0
);
S
NodeListNode
*
plans
=
(
SNodeListNode
*
)
nodesListGetNode
(
pDag
->
pSubplans
,
0
);
int32_t
taskNum
=
taosArrayGetSize
(
plans
);
int32_t
taskNum
=
LIST_LENGTH
(
plans
->
pNodeList
);
if
(
taskNum
<=
0
)
{
if
(
taskNum
<=
0
)
{
qError
(
"invalid task num: %d"
,
taskNum
);
qError
(
"invalid task num: %d"
,
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
@@ -1449,7 +1449,7 @@ int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
...
@@ -1449,7 +1449,7 @@ int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
int32_t
code
=
0
;
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
SSubplan
*
plan
=
taosArrayGetP
(
plans
,
i
);
SSubplan
*
plan
=
(
SSubplan
*
)
nodesListGetNode
(
plans
->
pNodeList
,
i
);
tInfo
.
addr
=
plan
->
execNode
;
tInfo
.
addr
=
plan
->
execNode
;
code
=
qSubPlanToString
(
plan
,
&
msg
,
&
msgLen
);
code
=
qSubPlanToString
(
plan
,
&
msg
,
&
msgLen
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
523855be
...
@@ -78,9 +78,9 @@ void schtBuildQueryDag(SQueryPlan *dag) {
...
@@ -78,9 +78,9 @@ void schtBuildQueryDag(SQueryPlan *dag) {
dag
->
queryId
=
qId
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
dag
->
numOfSubplans
=
2
;
dag
->
pSubplans
=
taosArrayInit
(
dag
->
numOfSubplans
,
POINTER_BYTES
);
dag
->
pSubplans
=
nodesMakeList
(
);
S
Array
*
scan
=
taosArrayInit
(
1
,
POINTER_BYTES
);
S
NodeListNode
*
scan
=
(
SNodeListNode
*
)
nodesMakeNode
(
QUERY_NODE_NODE_LIST
);
S
Array
*
merge
=
taosArrayInit
(
1
,
POINTER_BYTES
);
S
NodeListNode
*
merge
=
(
SNodeListNode
*
)
nodesMakeNode
(
QUERY_NODE_NODE_LIST
);
SSubplan
*
scanPlan
=
(
SSubplan
*
)
calloc
(
1
,
sizeof
(
SSubplan
));
SSubplan
*
scanPlan
=
(
SSubplan
*
)
calloc
(
1
,
sizeof
(
SSubplan
));
SSubplan
*
mergePlan
=
(
SSubplan
*
)
calloc
(
1
,
sizeof
(
SSubplan
));
SSubplan
*
mergePlan
=
(
SSubplan
*
)
calloc
(
1
,
sizeof
(
SSubplan
));
...
@@ -88,7 +88,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
...
@@ -88,7 +88,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan
->
id
.
queryId
=
qId
;
scanPlan
->
id
.
queryId
=
qId
;
scanPlan
->
id
.
templateId
=
0x0000000000000002
;
scanPlan
->
id
.
templateId
=
0x0000000000000002
;
scanPlan
->
id
.
subplanId
=
0x0000000000000003
;
scanPlan
->
id
.
subplanId
=
0x0000000000000003
;
scanPlan
->
type
=
QUERY
_TYPE_SCAN
;
scanPlan
->
subplanType
=
SUBPLAN
_TYPE_SCAN
;
scanPlan
->
execNode
.
nodeId
=
1
;
scanPlan
->
execNode
.
nodeId
=
1
;
scanPlan
->
execNode
.
epset
.
inUse
=
0
;
scanPlan
->
execNode
.
epset
.
inUse
=
0
;
...
@@ -96,30 +96,30 @@ void schtBuildQueryDag(SQueryPlan *dag) {
...
@@ -96,30 +96,30 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan
->
pChildren
=
NULL
;
scanPlan
->
pChildren
=
NULL
;
scanPlan
->
level
=
1
;
scanPlan
->
level
=
1
;
scanPlan
->
pParents
=
taosArrayInit
(
1
,
POINTER_BYTES
);
scanPlan
->
pParents
=
nodesMakeList
(
);
scanPlan
->
pNode
=
(
SPhysiNode
*
)
calloc
(
1
,
sizeof
(
SPhysiNode
));
scanPlan
->
pNode
=
(
SPhysiNode
*
)
calloc
(
1
,
sizeof
(
SPhysiNode
));
scanPlan
->
msgType
=
TDMT_VND_QUERY
;
scanPlan
->
msgType
=
TDMT_VND_QUERY
;
mergePlan
->
id
.
queryId
=
qId
;
mergePlan
->
id
.
queryId
=
qId
;
mergePlan
->
id
.
templateId
=
schtMergeTemplateId
;
mergePlan
->
id
.
templateId
=
schtMergeTemplateId
;
mergePlan
->
id
.
subplanId
=
0x5555
555555
;
mergePlan
->
id
.
subplanId
=
0x5555
;
mergePlan
->
type
=
QUERY
_TYPE_MERGE
;
mergePlan
->
subplanType
=
SUBPLAN
_TYPE_MERGE
;
mergePlan
->
level
=
0
;
mergePlan
->
level
=
0
;
mergePlan
->
execNode
.
epset
.
numOfEps
=
0
;
mergePlan
->
execNode
.
epset
.
numOfEps
=
0
;
mergePlan
->
pChildren
=
taosArrayInit
(
1
,
POINTER_BYTES
);
mergePlan
->
pChildren
=
nodesMakeList
(
);
mergePlan
->
pParents
=
NULL
;
mergePlan
->
pParents
=
NULL
;
mergePlan
->
pNode
=
(
SPhysiNode
*
)
calloc
(
1
,
sizeof
(
SPhysiNode
));
mergePlan
->
pNode
=
(
SPhysiNode
*
)
calloc
(
1
,
sizeof
(
SPhysiNode
));
mergePlan
->
msgType
=
TDMT_VND_QUERY
;
mergePlan
->
msgType
=
TDMT_VND_QUERY
;
SSubplan
*
mergePointer
=
(
SSubplan
*
)
taosArrayPush
(
merge
,
&
mergePlan
);
nodesListAppend
(
merge
->
pNodeList
,
(
SNode
*
)
mergePlan
);
SSubplan
*
scanPointer
=
(
SSubplan
*
)
taosArrayPush
(
scan
,
&
scanPlan
);
nodesListAppend
(
scan
->
pNodeList
,
(
SNode
*
)
scanPlan
);
taosArrayPush
(
mergePlan
->
pChildren
,
&
scanPlan
);
nodesListAppend
(
mergePlan
->
pChildren
,
(
SNode
*
)
scanPlan
);
taosArrayPush
(
scanPlan
->
pParents
,
&
mergePlan
);
nodesListAppend
(
scanPlan
->
pParents
,
(
SNode
*
)
mergePlan
);
taosArrayPush
(
dag
->
pSubplans
,
&
merge
);
nodesListAppend
(
dag
->
pSubplans
,
(
SNode
*
)
merge
);
taosArrayPush
(
dag
->
pSubplans
,
&
scan
);
nodesListAppend
(
dag
->
pSubplans
,
(
SNode
*
)
scan
);
}
}
void
schtFreeQueryDag
(
SQueryPlan
*
dag
)
{
void
schtFreeQueryDag
(
SQueryPlan
*
dag
)
{
...
@@ -132,15 +132,15 @@ void schtBuildInsertDag(SQueryPlan *dag) {
...
@@ -132,15 +132,15 @@ void schtBuildInsertDag(SQueryPlan *dag) {
dag
->
queryId
=
qId
;
dag
->
queryId
=
qId
;
dag
->
numOfSubplans
=
2
;
dag
->
numOfSubplans
=
2
;
dag
->
pSubplans
=
taosArrayInit
(
1
,
POINTER_BYTES
);
dag
->
pSubplans
=
nodesMakeList
(
);
S
Array
*
inserta
=
taosArrayInit
(
dag
->
numOfSubplans
,
POINTER_BYTES
);
S
NodeListNode
*
inserta
=
(
SNodeListNode
*
)
nodesMakeNode
(
QUERY_NODE_NODE_LIST
);
SSubplan
*
insertPlan
=
(
SSubplan
*
)
calloc
(
2
,
sizeof
(
SSubplan
));
SSubplan
*
insertPlan
=
(
SSubplan
*
)
calloc
(
2
,
sizeof
(
SSubplan
));
insertPlan
[
0
].
id
.
queryId
=
qId
;
insertPlan
[
0
].
id
.
queryId
=
qId
;
insertPlan
[
0
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
0
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
0
].
id
.
subplanId
=
0x0000000000000004
;
insertPlan
[
0
].
id
.
subplanId
=
0x0000000000000004
;
insertPlan
[
0
].
type
=
QUERY
_TYPE_MODIFY
;
insertPlan
[
0
].
subplanType
=
SUBPLAN
_TYPE_MODIFY
;
insertPlan
[
0
].
level
=
0
;
insertPlan
[
0
].
level
=
0
;
insertPlan
[
0
].
execNode
.
nodeId
=
1
;
insertPlan
[
0
].
execNode
.
nodeId
=
1
;
...
@@ -156,7 +156,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
...
@@ -156,7 +156,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan
[
1
].
id
.
queryId
=
qId
;
insertPlan
[
1
].
id
.
queryId
=
qId
;
insertPlan
[
1
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
1
].
id
.
templateId
=
0x0000000000000003
;
insertPlan
[
1
].
id
.
subplanId
=
0x0000000000000005
;
insertPlan
[
1
].
id
.
subplanId
=
0x0000000000000005
;
insertPlan
[
1
].
type
=
QUERY
_TYPE_MODIFY
;
insertPlan
[
1
].
subplanType
=
SUBPLAN
_TYPE_MODIFY
;
insertPlan
[
1
].
level
=
0
;
insertPlan
[
1
].
level
=
0
;
insertPlan
[
1
].
execNode
.
nodeId
=
1
;
insertPlan
[
1
].
execNode
.
nodeId
=
1
;
...
@@ -169,11 +169,11 @@ void schtBuildInsertDag(SQueryPlan *dag) {
...
@@ -169,11 +169,11 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan
[
1
].
pDataSink
=
(
SDataSinkNode
*
)
calloc
(
1
,
sizeof
(
SDataSinkNode
));
insertPlan
[
1
].
pDataSink
=
(
SDataSinkNode
*
)
calloc
(
1
,
sizeof
(
SDataSinkNode
));
insertPlan
[
1
].
msgType
=
TDMT_VND_SUBMIT
;
insertPlan
[
1
].
msgType
=
TDMT_VND_SUBMIT
;
taosArrayPush
(
inserta
,
&
insertPlan
);
nodesListAppend
(
inserta
->
pNodeList
,
(
SNode
*
)
insertPlan
);
insertPlan
+=
1
;
insertPlan
+=
1
;
taosArrayPush
(
inserta
,
&
insertPlan
);
nodesListAppend
(
inserta
->
pNodeList
,
(
SNode
*
)
insertPlan
);
taosArrayPush
(
dag
->
pSubplans
,
&
inserta
);
nodesListAppend
(
dag
->
pSubplans
,
(
SNode
*
)
inserta
);
}
}
...
@@ -347,7 +347,7 @@ void* schtRunJobThread(void *aa) {
...
@@ -347,7 +347,7 @@ void* schtRunJobThread(void *aa) {
char
*
dbname
=
"1.db1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SVgroupInfo
vgInfo
=
{
0
};
SQueryPlan
dag
=
{
0
}
;
SQueryPlan
dag
;
schtInitLogFile
();
schtInitLogFile
();
...
@@ -517,7 +517,7 @@ TEST(queryTest, normalCase) {
...
@@ -517,7 +517,7 @@ TEST(queryTest, normalCase) {
char
*
tablename
=
"table1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SVgroupInfo
vgInfo
=
{
0
};
SSchJob
*
pJob
=
NULL
;
SSchJob
*
pJob
=
NULL
;
SQueryPlan
dag
=
{
0
}
;
SQueryPlan
dag
;
schtInitLogFile
();
schtInitLogFile
();
...
@@ -620,7 +620,7 @@ TEST(insertTest, normalCase) {
...
@@ -620,7 +620,7 @@ TEST(insertTest, normalCase) {
char
*
dbname
=
"1.db1"
;
char
*
dbname
=
"1.db1"
;
char
*
tablename
=
"table1"
;
char
*
tablename
=
"table1"
;
SVgroupInfo
vgInfo
=
{
0
};
SVgroupInfo
vgInfo
=
{
0
};
SQueryPlan
dag
=
{
0
}
;
SQueryPlan
dag
;
uint64_t
numOfRows
=
0
;
uint64_t
numOfRows
=
0
;
schtInitLogFile
();
schtInitLogFile
();
...
...
source/util/src/tjson.c
浏览文件 @
523855be
...
@@ -73,6 +73,22 @@ int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
...
@@ -73,6 +73,22 @@ int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
return
tjsonAddItemToArray
(
pJson
,
pJobj
);
return
tjsonAddItemToArray
(
pJson
,
pJobj
);
}
}
int32_t
tjsonAddArray
(
SJson
*
pJson
,
const
char
*
pName
,
FToJson
func
,
const
void
*
pArray
,
int32_t
itemSize
,
int32_t
num
)
{
if
(
num
>
0
)
{
SJson
*
pJsonArray
=
tjsonAddArrayToObject
(
pJson
,
pName
);
if
(
NULL
==
pJsonArray
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
size_t
i
=
0
;
i
<
num
;
++
i
)
{
int32_t
code
=
tjsonAddItem
(
pJsonArray
,
func
,
(
const
char
*
)
pArray
+
itemSize
*
i
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
}
}
return
TSDB_CODE_SUCCESS
;
}
char
*
tjsonToString
(
const
SJson
*
pJson
)
{
return
cJSON_Print
((
cJSON
*
)
pJson
);
}
char
*
tjsonToString
(
const
SJson
*
pJson
)
{
return
cJSON_Print
((
cJSON
*
)
pJson
);
}
char
*
tjsonToUnformattedString
(
const
SJson
*
pJson
)
{
return
cJSON_PrintUnformatted
((
cJSON
*
)
pJson
);
}
char
*
tjsonToUnformattedString
(
const
SJson
*
pJson
)
{
return
cJSON_PrintUnformatted
((
cJSON
*
)
pJson
);
}
...
@@ -175,4 +191,16 @@ int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, voi
...
@@ -175,4 +191,16 @@ int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, voi
return
func
(
pJsonObj
,
pObj
);
return
func
(
pJsonObj
,
pObj
);
}
}
int32_t
tjsonToArray
(
const
SJson
*
pJson
,
const
char
*
pName
,
FToObject
func
,
void
*
pArray
,
int32_t
itemSize
)
{
const
cJSON
*
jArray
=
tjsonGetObjectItem
(
pJson
,
pName
);
int32_t
size
=
(
NULL
==
jArray
?
0
:
tjsonGetArraySize
(
jArray
));
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
int32_t
code
=
func
(
tjsonGetArrayItem
(
jArray
,
i
),
(
char
*
)
pArray
+
itemSize
*
i
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
}
return
TSDB_CODE_SUCCESS
;
}
SJson
*
tjsonParse
(
const
char
*
pStr
)
{
return
cJSON_Parse
(
pStr
);
}
SJson
*
tjsonParse
(
const
char
*
pStr
)
{
return
cJSON_Parse
(
pStr
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录