Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f18cdd5f
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f18cdd5f
编写于
4月 09, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/node
上级
6810e82f
990fe3fa
变更
22
展开全部
隐藏空白更改
内联
并排
Showing
22 changed file
with
3049 addition
and
2737 deletion
+3049
-2737
include/common/tdatablock.h
include/common/tdatablock.h
+50
-3
include/common/ttokendef.h
include/common/ttokendef.h
+163
-158
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+4
-4
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+8
-0
include/util/tdef.h
include/util/tdef.h
+2
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+16
-9
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+25
-69
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-0
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+0
-1
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+33
-16
source/libs/nodes/src/nodesToSQLFuncs.c
source/libs/nodes/src/nodesToSQLFuncs.c
+6
-2
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+22
-11
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+2
-2
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+17
-6
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+8
-6
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+5
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+52
-8
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+2480
-2427
source/libs/parser/test/parserAstTest.cpp
source/libs/parser/test/parserAstTest.cpp
+42
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+3
-1
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+10
-0
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+100
-14
未找到文件。
include/common/tdatablock.h
浏览文件 @
f18cdd5f
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
#define _TD_COMMON_EP_H_
#define _TD_COMMON_EP_H_
#include "tcommon.h"
#include "tcommon.h"
#include "tcompression.h"
#include "tmsg.h"
#include "tmsg.h"
#ifdef __cplusplus
#ifdef __cplusplus
...
@@ -115,11 +116,11 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
...
@@ -115,11 +116,11 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
static
FORCE_INLINE
void
colDataAppendNNULL
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
start
,
size_t
nRows
)
{
static
FORCE_INLINE
void
colDataAppendNNULL
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
start
,
size_t
nRows
)
{
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
pColumnInfoData
->
varmeta
.
offset
[
i
]
=
-
1
;
// it is a null value of VAR type.
pColumnInfoData
->
varmeta
.
offset
[
i
]
=
-
1
;
// it is a null value of VAR type.
}
}
}
else
{
}
else
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
for
(
int32_t
i
=
start
;
i
<
start
+
nRows
;
++
i
)
{
colDataSetNull_f
(
pColumnInfoData
->
nullbitmap
,
i
);
colDataSetNull_f
(
pColumnInfoData
->
nullbitmap
,
i
);
}
}
}
}
...
@@ -200,12 +201,58 @@ void blockDataCleanup(SSDataBlock* pDataBlock);
...
@@ -200,12 +201,58 @@ void blockDataCleanup(SSDataBlock* pDataBlock);
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
);
size_t
blockDataGetCapacityInRow
(
const
SSDataBlock
*
pBlock
,
size_t
pageSize
);
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
int32_t
blockDataTrimFirstNRows
(
SSDataBlock
*
pBlock
,
size_t
n
);
int32_t
blockDataTrimFirstNRows
(
SSDataBlock
*
pBlock
,
size_t
n
);
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
);
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
);
void
blockDebugShowData
(
const
SArray
*
dataBlocks
);
void
blockDebugShowData
(
const
SArray
*
dataBlocks
);
static
FORCE_INLINE
int32_t
blockCompressColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
int32_t
colSize
=
colDataGetLength
(
pColRes
,
numOfRows
);
return
(
*
(
tDataTypes
[
pColRes
->
info
.
type
].
compFunc
))(
pColRes
->
pData
,
colSize
,
numOfRows
,
data
,
colSize
+
COMP_OVERFLOW_BYTES
,
compressed
,
NULL
,
0
);
}
static
FORCE_INLINE
void
blockCompressEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
int8_t
needCompress
)
{
int32_t
*
colSizes
=
(
int32_t
*
)
data
;
data
+=
numOfCols
*
sizeof
(
int32_t
);
*
dataLen
=
(
numOfCols
*
sizeof
(
int32_t
));
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
col
);
// copy the null bitmap
if
(
IS_VAR_DATA_TYPE
(
pColRes
->
info
.
type
))
{
size_t
metaSize
=
numOfRows
*
sizeof
(
int32_t
);
memcpy
(
data
,
pColRes
->
varmeta
.
offset
,
metaSize
);
data
+=
metaSize
;
(
*
dataLen
)
+=
metaSize
;
}
else
{
int32_t
len
=
BitmapLen
(
numOfRows
);
memcpy
(
data
,
pColRes
->
nullbitmap
,
len
);
data
+=
len
;
(
*
dataLen
)
+=
len
;
}
if
(
needCompress
)
{
colSizes
[
col
]
=
blockCompressColData
(
pColRes
,
numOfRows
,
data
,
needCompress
);
data
+=
colSizes
[
col
];
(
*
dataLen
)
+=
colSizes
[
col
];
}
else
{
colSizes
[
col
]
=
colDataGetLength
(
pColRes
,
numOfRows
);
(
*
dataLen
)
+=
colSizes
[
col
];
memmove
(
data
,
pColRes
->
pData
,
colSizes
[
col
]);
data
+=
colSizes
[
col
];
}
colSizes
[
col
]
=
htonl
(
colSizes
[
col
]);
}
}
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
include/common/ttokendef.h
浏览文件 @
f18cdd5f
...
@@ -59,164 +59,169 @@
...
@@ -59,164 +59,169 @@
#define TK_LOCAL 41
#define TK_LOCAL 41
#define TK_QNODE 42
#define TK_QNODE 42
#define TK_ON 43
#define TK_ON 43
#define TK_DATABASE 44
#define TK_BNODE 44
#define TK_USE 45
#define TK_SNODE 45
#define TK_IF 46
#define TK_MNODE 46
#define TK_NOT 47
#define TK_DATABASE 47
#define TK_EXISTS 48
#define TK_USE 48
#define TK_BLOCKS 49
#define TK_IF 49
#define TK_CACHE 50
#define TK_NOT 50
#define TK_CACHELAST 51
#define TK_EXISTS 51
#define TK_COMP 52
#define TK_BLOCKS 52
#define TK_DAYS 53
#define TK_CACHE 53
#define TK_NK_VARIABLE 54
#define TK_CACHELAST 54
#define TK_FSYNC 55
#define TK_COMP 55
#define TK_MAXROWS 56
#define TK_DAYS 56
#define TK_MINROWS 57
#define TK_NK_VARIABLE 57
#define TK_KEEP 58
#define TK_FSYNC 58
#define TK_PRECISION 59
#define TK_MAXROWS 59
#define TK_QUORUM 60
#define TK_MINROWS 60
#define TK_REPLICA 61
#define TK_KEEP 61
#define TK_TTL 62
#define TK_PRECISION 62
#define TK_WAL 63
#define TK_QUORUM 63
#define TK_VGROUPS 64
#define TK_REPLICA 64
#define TK_SINGLE_STABLE 65
#define TK_TTL 65
#define TK_STREAM_MODE 66
#define TK_WAL 66
#define TK_RETENTIONS 67
#define TK_VGROUPS 67
#define TK_NK_COMMA 68
#define TK_SINGLE_STABLE 68
#define TK_NK_COLON 69
#define TK_STREAM_MODE 69
#define TK_TABLE 70
#define TK_RETENTIONS 70
#define TK_NK_LP 71
#define TK_NK_COMMA 71
#define TK_NK_RP 72
#define TK_NK_COLON 72
#define TK_STABLE 73
#define TK_TABLE 73
#define TK_ADD 74
#define TK_NK_LP 74
#define TK_COLUMN 75
#define TK_NK_RP 75
#define TK_MODIFY 76
#define TK_STABLE 76
#define TK_RENAME 77
#define TK_ADD 77
#define TK_TAG 78
#define TK_COLUMN 78
#define TK_SET 79
#define TK_MODIFY 79
#define TK_NK_EQ 80
#define TK_RENAME 80
#define TK_USING 81
#define TK_TAG 81
#define TK_TAGS 82
#define TK_SET 82
#define TK_NK_DOT 83
#define TK_NK_EQ 83
#define TK_COMMENT 84
#define TK_USING 84
#define TK_BOOL 85
#define TK_TAGS 85
#define TK_TINYINT 86
#define TK_NK_DOT 86
#define TK_SMALLINT 87
#define TK_COMMENT 87
#define TK_INT 88
#define TK_BOOL 88
#define TK_INTEGER 89
#define TK_TINYINT 89
#define TK_BIGINT 90
#define TK_SMALLINT 90
#define TK_FLOAT 91
#define TK_INT 91
#define TK_DOUBLE 92
#define TK_INTEGER 92
#define TK_BINARY 93
#define TK_BIGINT 93
#define TK_TIMESTAMP 94
#define TK_FLOAT 94
#define TK_NCHAR 95
#define TK_DOUBLE 95
#define TK_UNSIGNED 96
#define TK_BINARY 96
#define TK_JSON 97
#define TK_TIMESTAMP 97
#define TK_VARCHAR 98
#define TK_NCHAR 98
#define TK_MEDIUMBLOB 99
#define TK_UNSIGNED 99
#define TK_BLOB 100
#define TK_JSON 100
#define TK_VARBINARY 101
#define TK_VARCHAR 101
#define TK_DECIMAL 102
#define TK_MEDIUMBLOB 102
#define TK_SMA 103
#define TK_BLOB 103
#define TK_ROLLUP 104
#define TK_VARBINARY 104
#define TK_FILE_FACTOR 105
#define TK_DECIMAL 105
#define TK_NK_FLOAT 106
#define TK_SMA 106
#define TK_DELAY 107
#define TK_ROLLUP 107
#define TK_SHOW 108
#define TK_FILE_FACTOR 108
#define TK_DATABASES 109
#define TK_NK_FLOAT 109
#define TK_TABLES 110
#define TK_DELAY 110
#define TK_STABLES 111
#define TK_SHOW 111
#define TK_MNODES 112
#define TK_DATABASES 112
#define TK_MODULES 113
#define TK_TABLES 113
#define TK_QNODES 114
#define TK_STABLES 114
#define TK_FUNCTIONS 115
#define TK_MNODES 115
#define TK_INDEXES 116
#define TK_MODULES 116
#define TK_FROM 117
#define TK_QNODES 117
#define TK_ACCOUNTS 118
#define TK_FUNCTIONS 118
#define TK_APPS 119
#define TK_INDEXES 119
#define TK_CONNECTIONS 120
#define TK_FROM 120
#define TK_LICENCE 121
#define TK_ACCOUNTS 121
#define TK_QUERIES 122
#define TK_APPS 122
#define TK_SCORES 123
#define TK_CONNECTIONS 123
#define TK_TOPICS 124
#define TK_LICENCE 124
#define TK_VARIABLES 125
#define TK_QUERIES 125
#define TK_LIKE 126
#define TK_SCORES 126
#define TK_INDEX 127
#define TK_TOPICS 127
#define TK_FULLTEXT 128
#define TK_VARIABLES 128
#define TK_FUNCTION 129
#define TK_BNODES 129
#define TK_INTERVAL 130
#define TK_SNODES 130
#define TK_TOPIC 131
#define TK_LIKE 131
#define TK_AS 132
#define TK_INDEX 132
#define TK_DESC 133
#define TK_FULLTEXT 133
#define TK_DESCRIBE 134
#define TK_FUNCTION 134
#define TK_RESET 135
#define TK_INTERVAL 135
#define TK_QUERY 136
#define TK_TOPIC 136
#define TK_EXPLAIN 137
#define TK_AS 137
#define TK_ANALYZE 138
#define TK_DESC 138
#define TK_VERBOSE 139
#define TK_DESCRIBE 139
#define TK_NK_BOOL 140
#define TK_RESET 140
#define TK_RATIO 141
#define TK_QUERY 141
#define TK_COMPACT 142
#define TK_EXPLAIN 142
#define TK_VNODES 143
#define TK_ANALYZE 143
#define TK_IN 144
#define TK_VERBOSE 144
#define TK_OUTPUTTYPE 145
#define TK_NK_BOOL 145
#define TK_AGGREGATE 146
#define TK_RATIO 146
#define TK_BUFSIZE 147
#define TK_COMPACT 147
#define TK_STREAM 148
#define TK_VNODES 148
#define TK_INTO 149
#define TK_IN 149
#define TK_KILL 150
#define TK_OUTPUTTYPE 150
#define TK_CONNECTION 151
#define TK_AGGREGATE 151
#define TK_MERGE 152
#define TK_BUFSIZE 152
#define TK_VGROUP 153
#define TK_STREAM 153
#define TK_REDISTRIBUTE 154
#define TK_INTO 154
#define TK_SPLIT 155
#define TK_KILL 155
#define TK_SYNCDB 156
#define TK_CONNECTION 156
#define TK_NULL 157
#define TK_MERGE 157
#define TK_FIRST 158
#define TK_VGROUP 158
#define TK_LAST 159
#define TK_REDISTRIBUTE 159
#define TK_NOW 160
#define TK_SPLIT 160
#define TK_ROWTS 161
#define TK_SYNCDB 161
#define TK_TBNAME 162
#define TK_NULL 162
#define TK_QSTARTTS 163
#define TK_FIRST 163
#define TK_QENDTS 164
#define TK_LAST 164
#define TK_WSTARTTS 165
#define TK_NOW 165
#define TK_WENDTS 166
#define TK_ROWTS 166
#define TK_WDURATION 167
#define TK_TBNAME 167
#define TK_BETWEEN 168
#define TK_QSTARTTS 168
#define TK_IS 169
#define TK_QENDTS 169
#define TK_NK_LT 170
#define TK_WSTARTTS 170
#define TK_NK_GT 171
#define TK_WENDTS 171
#define TK_NK_LE 172
#define TK_WDURATION 172
#define TK_NK_GE 173
#define TK_BETWEEN 173
#define TK_NK_NE 174
#define TK_IS 174
#define TK_MATCH 175
#define TK_NK_LT 175
#define TK_NMATCH 176
#define TK_NK_GT 176
#define TK_JOIN 177
#define TK_NK_LE 177
#define TK_INNER 178
#define TK_NK_GE 178
#define TK_SELECT 179
#define TK_NK_NE 179
#define TK_DISTINCT 180
#define TK_MATCH 180
#define TK_WHERE 181
#define TK_NMATCH 181
#define TK_PARTITION 182
#define TK_JOIN 182
#define TK_BY 183
#define TK_INNER 183
#define TK_SESSION 184
#define TK_SELECT 184
#define TK_STATE_WINDOW 185
#define TK_DISTINCT 185
#define TK_SLIDING 186
#define TK_WHERE 186
#define TK_FILL 187
#define TK_PARTITION 187
#define TK_VALUE 188
#define TK_BY 188
#define TK_NONE 189
#define TK_SESSION 189
#define TK_PREV 190
#define TK_STATE_WINDOW 190
#define TK_LINEAR 191
#define TK_SLIDING 191
#define TK_NEXT 192
#define TK_FILL 192
#define TK_GROUP 193
#define TK_VALUE 193
#define TK_HAVING 194
#define TK_NONE 194
#define TK_ORDER 195
#define TK_PREV 195
#define TK_SLIMIT 196
#define TK_LINEAR 196
#define TK_SOFFSET 197
#define TK_NEXT 197
#define TK_LIMIT 198
#define TK_GROUP 198
#define TK_OFFSET 199
#define TK_HAVING 199
#define TK_ASC 200
#define TK_ORDER 200
#define TK_NULLS 201
#define TK_SLIMIT 201
#define TK_SOFFSET 202
#define TK_LIMIT 203
#define TK_OFFSET 204
#define TK_ASC 205
#define TK_NULLS 206
#define TK_NK_SPACE 300
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
#define TK_NK_COMMENT 301
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
f18cdd5f
...
@@ -230,15 +230,15 @@ typedef struct SDropIndexStmt {
...
@@ -230,15 +230,15 @@ typedef struct SDropIndexStmt {
char
tableName
[
TSDB_TABLE_NAME_LEN
];
char
tableName
[
TSDB_TABLE_NAME_LEN
];
}
SDropIndexStmt
;
}
SDropIndexStmt
;
typedef
struct
SCreate
Qn
odeStmt
{
typedef
struct
SCreate
ComponentN
odeStmt
{
ENodeType
type
;
ENodeType
type
;
int32_t
dnodeId
;
int32_t
dnodeId
;
}
SCreate
Qn
odeStmt
;
}
SCreate
ComponentN
odeStmt
;
typedef
struct
SDrop
Qn
odeStmt
{
typedef
struct
SDrop
ComponentN
odeStmt
{
ENodeType
type
;
ENodeType
type
;
int32_t
dnodeId
;
int32_t
dnodeId
;
}
SDrop
Qn
odeStmt
;
}
SDrop
ComponentN
odeStmt
;
typedef
struct
SCreateTopicStmt
{
typedef
struct
SCreateTopicStmt
{
ENodeType
type
;
ENodeType
type
;
...
...
include/libs/nodes/nodes.h
浏览文件 @
f18cdd5f
...
@@ -105,6 +105,12 @@ typedef enum ENodeType {
...
@@ -105,6 +105,12 @@ typedef enum ENodeType {
QUERY_NODE_DROP_INDEX_STMT
,
QUERY_NODE_DROP_INDEX_STMT
,
QUERY_NODE_CREATE_QNODE_STMT
,
QUERY_NODE_CREATE_QNODE_STMT
,
QUERY_NODE_DROP_QNODE_STMT
,
QUERY_NODE_DROP_QNODE_STMT
,
QUERY_NODE_CREATE_BNODE_STMT
,
QUERY_NODE_DROP_BNODE_STMT
,
QUERY_NODE_CREATE_SNODE_STMT
,
QUERY_NODE_DROP_SNODE_STMT
,
QUERY_NODE_CREATE_MNODE_STMT
,
QUERY_NODE_DROP_MNODE_STMT
,
QUERY_NODE_CREATE_TOPIC_STMT
,
QUERY_NODE_CREATE_TOPIC_STMT
,
QUERY_NODE_DROP_TOPIC_STMT
,
QUERY_NODE_DROP_TOPIC_STMT
,
QUERY_NODE_ALTER_LOCAL_STMT
,
QUERY_NODE_ALTER_LOCAL_STMT
,
...
@@ -142,6 +148,8 @@ typedef enum ENodeType {
...
@@ -142,6 +148,8 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_SCORES_STMT
,
QUERY_NODE_SHOW_SCORES_STMT
,
QUERY_NODE_SHOW_TOPICS_STMT
,
QUERY_NODE_SHOW_TOPICS_STMT
,
QUERY_NODE_SHOW_VARIABLE_STMT
,
QUERY_NODE_SHOW_VARIABLE_STMT
,
QUERY_NODE_SHOW_BNODES_STMT
,
QUERY_NODE_SHOW_SNODES_STMT
,
QUERY_NODE_KILL_CONNECTION_STMT
,
QUERY_NODE_KILL_CONNECTION_STMT
,
QUERY_NODE_KILL_QUERY_STMT
,
QUERY_NODE_KILL_QUERY_STMT
,
...
...
include/util/tdef.h
浏览文件 @
f18cdd5f
...
@@ -108,6 +108,8 @@ extern const int32_t TYPE_BYTES[15];
...
@@ -108,6 +108,8 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
#define TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED "user_table_distributed"
#define TSDB_INS_TABLE_USER_USERS "user_users"
#define TSDB_INS_TABLE_USER_USERS "user_users"
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_INS_TABLE_VGROUPS "vgroups"
#define TSDB_INS_TABLE_BNODES "bnodes"
#define TSDB_INS_TABLE_SNODES "snodes"
#define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f18cdd5f
...
@@ -296,7 +296,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -296,7 +296,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
}
STqTopic
*
pTopic
=
NULL
;
STqTopic
*
pTopic
=
NULL
;
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
int
32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STqTopic
*
topic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
STqTopic
*
topic
=
taosArrayGet
(
pConsumer
->
topics
,
i
);
// TODO race condition
// TODO race condition
...
@@ -316,7 +316,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -316,7 +316,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return
0
;
return
0
;
}
}
vDebug
(
"poll topic %s from consumer %ld (epoch %d) vg %d"
,
pTopic
->
topicName
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
);
vDebug
(
"poll topic %s from consumer %ld (epoch %d) vg %d"
,
pTopic
->
topicName
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
);
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
skipLogNum
=
0
;
rsp
.
skipLogNum
=
0
;
...
@@ -326,7 +327,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -326,7 +327,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO
// TODO
consumerEpoch
=
atomic_load_32
(
&
pConsumer
->
epoch
);
consumerEpoch
=
atomic_load_32
(
&
pConsumer
->
epoch
);
if
(
consumerEpoch
>
reqEpoch
)
{
if
(
consumerEpoch
>
reqEpoch
)
{
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerEpoch
,
reqEpoch
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerEpoch
,
reqEpoch
);
break
;
break
;
}
}
SWalReadHead
*
pHead
;
SWalReadHead
*
pHead
;
...
@@ -334,10 +336,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -334,10 +336,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: no more log, set timer to wait blocking time
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// if data inserted during waiting, launch query and
// response to user
// response to user
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
break
;
break
;
}
}
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
...
@@ -361,7 +365,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -361,7 +365,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
}
if
(
taosArrayGetSize
(
pRes
)
==
0
)
{
if
(
taosArrayGetSize
(
pRes
)
==
0
)
{
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
fetchOffset
++
;
fetchOffset
++
;
rsp
.
skipLogNum
++
;
rsp
.
skipLogNum
++
;
taosArrayDestroy
(
pRes
);
taosArrayDestroy
(
pRes
);
...
@@ -390,7 +395,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -390,7 +395,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
pCont
=
buf
;
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
code
=
0
;
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
tmsgSendRsp
(
pMsg
);
tmsgSendRsp
(
pMsg
);
taosMemoryFree
(
pHead
);
taosMemoryFree
(
pHead
);
return
0
;
return
0
;
...
@@ -421,7 +427,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -421,7 +427,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
contLen
=
tlen
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
tmsgSendRsp
(
pMsg
);
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) not rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerId
,
pReq
->
epoch
);
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) not rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerId
,
pReq
->
epoch
);
/*}*/
/*}*/
return
0
;
return
0
;
...
@@ -432,7 +439,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
...
@@ -432,7 +439,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
terrno
=
TSDB_CODE_SUCCESS
;
terrno
=
TSDB_CODE_SUCCESS
;
tDecodeSMqMVRebReq
(
msg
,
&
req
);
tDecodeSMqMVRebReq
(
msg
,
&
req
);
vDebug
(
"vg %d set from consumer %ld to consumer %ld"
,
req
.
vgId
,
req
.
oldConsumerId
,
req
.
newConsumerId
);
vDebug
(
"vg %d set from consumer %ld to consumer %ld"
,
req
.
vgId
,
req
.
oldConsumerId
,
req
.
newConsumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
ASSERT
(
pConsumer
);
ASSERT
(
pConsumer
);
ASSERT
(
pConsumer
->
consumerId
==
req
.
oldConsumerId
);
ASSERT
(
pConsumer
->
consumerId
==
req
.
oldConsumerId
);
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
f18cdd5f
...
@@ -18,14 +18,14 @@
...
@@ -18,14 +18,14 @@
#include "executorimpl.h"
#include "executorimpl.h"
#include "planner.h"
#include "planner.h"
#include "tcompression.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tglobal.h"
#include "tqueue.h"
#include "tqueue.h"
#include "tdatablock.h"
typedef
struct
SDataDispatchBuf
{
typedef
struct
SDataDispatchBuf
{
int32_t
useSize
;
int32_t
useSize
;
int32_t
allocSize
;
int32_t
allocSize
;
char
*
pData
;
char
*
pData
;
}
SDataDispatchBuf
;
}
SDataDispatchBuf
;
typedef
struct
SDataCacheEntry
{
typedef
struct
SDataCacheEntry
{
...
@@ -36,26 +36,25 @@ typedef struct SDataCacheEntry {
...
@@ -36,26 +36,25 @@ typedef struct SDataCacheEntry {
}
SDataCacheEntry
;
}
SDataCacheEntry
;
typedef
struct
SDataDispatchHandle
{
typedef
struct
SDataDispatchHandle
{
SDataSinkHandle
sink
;
SDataSinkHandle
sink
;
SDataSinkManager
*
pManager
;
SDataSinkManager
*
pManager
;
SDataBlockDescNode
*
pSchema
;
SDataBlockDescNode
*
pSchema
;
STaosQueue
*
pDataBlocks
;
STaosQueue
*
pDataBlocks
;
SDataDispatchBuf
nextOutput
;
SDataDispatchBuf
nextOutput
;
int32_t
status
;
int32_t
status
;
bool
queryEnd
;
bool
queryEnd
;
uint64_t
useconds
;
uint64_t
useconds
;
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
}
SDataDispatchHandle
;
}
SDataDispatchHandle
;
static
bool
needCompress
(
const
SSDataBlock
*
pData
,
const
SDataBlockDescNode
*
pSchema
)
{
static
bool
needCompress
(
const
SSDataBlock
*
pData
,
int32_t
numOfCols
)
{
if
(
tsCompressColData
<
0
||
0
==
pData
->
info
.
rows
)
{
if
(
tsCompressColData
<
0
||
0
==
pData
->
info
.
rows
)
{
return
false
;
return
false
;
}
}
int32_t
numOfCols
=
LIST_LENGTH
(
pSchema
->
pSlots
);
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pData
->
pDataBlock
,
col
);
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pData
->
pDataBlock
,
col
);
int32_t
colSize
=
pColRes
->
info
.
bytes
*
pData
->
info
.
rows
;
int32_t
colSize
=
pColRes
->
info
.
bytes
*
pData
->
info
.
rows
;
if
(
NEEDTO_COMPRESS_QUERY
(
colSize
))
{
if
(
NEEDTO_COMPRESS_QUERY
(
colSize
))
{
return
true
;
return
true
;
}
}
...
@@ -64,51 +63,6 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc
...
@@ -64,51 +63,6 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc
return
false
;
return
false
;
}
}
static
int32_t
compressColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
int32_t
colSize
=
colDataGetLength
(
pColRes
,
numOfRows
);
return
(
*
(
tDataTypes
[
pColRes
->
info
.
type
].
compFunc
))(
pColRes
->
pData
,
colSize
,
numOfRows
,
data
,
colSize
+
COMP_OVERFLOW_BYTES
,
compressed
,
NULL
,
0
);
}
static
void
copyData
(
const
SInputData
*
pInput
,
const
SDataBlockDescNode
*
pSchema
,
char
*
data
,
int8_t
compressed
,
int32_t
*
dataLen
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pSchema
->
pSlots
);
int32_t
*
colSizes
=
(
int32_t
*
)
data
;
data
+=
numOfCols
*
sizeof
(
int32_t
);
*
dataLen
=
(
numOfCols
*
sizeof
(
int32_t
));
int32_t
numOfRows
=
pInput
->
pData
->
info
.
rows
;
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pInput
->
pData
->
pDataBlock
,
col
);
// copy the null bitmap
if
(
IS_VAR_DATA_TYPE
(
pColRes
->
info
.
type
))
{
size_t
metaSize
=
numOfRows
*
sizeof
(
int32_t
);
memcpy
(
data
,
pColRes
->
varmeta
.
offset
,
metaSize
);
data
+=
metaSize
;
(
*
dataLen
)
+=
metaSize
;
}
else
{
int32_t
len
=
BitmapLen
(
numOfRows
);
memcpy
(
data
,
pColRes
->
nullbitmap
,
len
);
data
+=
len
;
(
*
dataLen
)
+=
len
;
}
if
(
compressed
)
{
colSizes
[
col
]
=
compressColData
(
pColRes
,
numOfRows
,
data
,
compressed
);
data
+=
colSizes
[
col
];
(
*
dataLen
)
+=
colSizes
[
col
];
}
else
{
colSizes
[
col
]
=
colDataGetLength
(
pColRes
,
numOfRows
);
(
*
dataLen
)
+=
colSizes
[
col
];
memmove
(
data
,
pColRes
->
pData
,
colSizes
[
col
]);
data
+=
colSizes
[
col
];
}
colSizes
[
col
]
=
htonl
(
colSizes
[
col
]);
}
}
// data format:
// data format:
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
...
@@ -117,16 +71,17 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
...
@@ -117,16 +71,17 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
// recorded in the first segment, next to the struct header
static
void
toDataCacheEntry
(
const
SDataDispatchHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
static
void
toDataCacheEntry
(
const
SDataDispatchHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pHandle
->
pSchema
->
pSlots
);
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pBuf
->
pData
;
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pBuf
->
pData
;
pEntry
->
compressed
=
(
int8_t
)
needCompress
(
pInput
->
pData
,
pHandle
->
pSchema
);
pEntry
->
compressed
=
(
int8_t
)
needCompress
(
pInput
->
pData
,
numOfCols
);
pEntry
->
numOfRows
=
pInput
->
pData
->
info
.
rows
;
pEntry
->
numOfRows
=
pInput
->
pData
->
info
.
rows
;
pEntry
->
dataLen
=
0
;
pEntry
->
dataLen
=
0
;
pBuf
->
useSize
=
sizeof
(
SRetrieveTableRsp
);
pBuf
->
useSize
=
sizeof
(
SRetrieveTableRsp
);
copyData
(
pInput
,
pHandle
->
pSchema
,
pEntry
->
data
,
pEntry
->
compressed
,
&
pEntry
->
dataLen
);
blockCompressEncode
(
pInput
->
pData
,
pEntry
->
data
,
&
pEntry
->
dataLen
,
numOfCols
,
pEntry
->
compressed
);
pEntry
->
dataLen
=
pEntry
->
dataLen
;
pBuf
->
useSize
+=
pEntry
->
dataLen
;
pBuf
->
useSize
+=
pEntry
->
dataLen
;
}
}
static
bool
allocBuf
(
SDataDispatchHandle
*
pDispatcher
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
static
bool
allocBuf
(
SDataDispatchHandle
*
pDispatcher
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
...
@@ -140,7 +95,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
...
@@ -140,7 +95,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
// NOTE: there are four bytes of an integer more than the required buffer space.
// NOTE: there are four bytes of an integer more than the required buffer space.
// struct size + data payload + length for each column + bitmap length
// struct size + data payload + length for each column + bitmap length
pBuf
->
allocSize
=
sizeof
(
SRetrieveTableRsp
)
+
blockDataGetSerialMetaSize
(
pInput
->
pData
)
+
pBuf
->
allocSize
=
sizeof
(
SRetrieveTableRsp
)
+
blockDataGetSerialMetaSize
(
pInput
->
pData
)
+
ceil
(
blockDataGetSerialRowSize
(
pInput
->
pData
)
*
pInput
->
pData
->
info
.
rows
);
ceil
(
blockDataGetSerialRowSize
(
pInput
->
pData
)
*
pInput
->
pData
->
info
.
rows
);
pBuf
->
pData
=
taosMemoryMalloc
(
pBuf
->
allocSize
);
pBuf
->
pData
=
taosMemoryMalloc
(
pBuf
->
allocSize
);
if
(
pBuf
->
pData
==
NULL
)
{
if
(
pBuf
->
pData
==
NULL
)
{
...
@@ -153,8 +108,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
...
@@ -153,8 +108,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
static
int32_t
updateStatus
(
SDataDispatchHandle
*
pDispatcher
)
{
static
int32_t
updateStatus
(
SDataDispatchHandle
*
pDispatcher
)
{
taosThreadMutexLock
(
&
pDispatcher
->
mutex
);
taosThreadMutexLock
(
&
pDispatcher
->
mutex
);
int32_t
blockNums
=
taosQueueSize
(
pDispatcher
->
pDataBlocks
);
int32_t
blockNums
=
taosQueueSize
(
pDispatcher
->
pDataBlocks
);
int32_t
status
=
(
0
==
blockNums
?
DS_BUF_EMPTY
:
int32_t
status
=
(
blockNums
<
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
?
DS_BUF_LOW
:
DS_BUF_FULL
));
(
0
==
blockNums
?
DS_BUF_EMPTY
:
(
blockNums
<
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
?
DS_BUF_LOW
:
DS_BUF_FULL
));
pDispatcher
->
status
=
status
;
pDispatcher
->
status
=
status
;
taosThreadMutexUnlock
(
&
pDispatcher
->
mutex
);
taosThreadMutexUnlock
(
&
pDispatcher
->
mutex
);
return
status
;
return
status
;
...
@@ -169,7 +125,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
...
@@ -169,7 +125,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
SDataDispatchBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataDispatchBuf
));
SDataDispatchBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataDispatchBuf
));
if
(
NULL
==
pBuf
||
!
allocBuf
(
pDispatcher
,
pInput
,
pBuf
))
{
if
(
NULL
==
pBuf
||
!
allocBuf
(
pDispatcher
,
pInput
,
pBuf
))
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
...
@@ -200,7 +156,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
...
@@ -200,7 +156,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
memcpy
(
&
pDispatcher
->
nextOutput
,
pBuf
,
sizeof
(
SDataDispatchBuf
));
memcpy
(
&
pDispatcher
->
nextOutput
,
pBuf
,
sizeof
(
SDataDispatchBuf
));
taosFreeQitem
(
pBuf
);
taosFreeQitem
(
pBuf
);
*
pLen
=
((
SDataCacheEntry
*
)(
pDispatcher
->
nextOutput
.
pData
))
->
dataLen
;
*
pLen
=
((
SDataCacheEntry
*
)(
pDispatcher
->
nextOutput
.
pData
))
->
dataLen
;
*
pQueryEnd
=
pDispatcher
->
queryEnd
;
*
pQueryEnd
=
pDispatcher
->
queryEnd
;
}
}
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
)
{
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
f18cdd5f
...
@@ -4760,6 +4760,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
...
@@ -4760,6 +4760,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
)
{
SSDataBlock
*
getSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
)
{
blockDataCleanup
(
pDataBlock
);
blockDataCleanup
(
pDataBlock
);
blockDataEnsureCapacity
(
pDataBlock
,
capacity
);
blockDataEnsureCapacity
(
pDataBlock
,
capacity
);
blockDataEnsureCapacity
(
pDataBlock
,
capacity
);
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
f18cdd5f
...
@@ -106,7 +106,6 @@ static SNode* columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) {
...
@@ -106,7 +106,6 @@ static SNode* columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) {
COPY_CHAR_ARRAY_FIELD
(
tableName
);
COPY_CHAR_ARRAY_FIELD
(
tableName
);
COPY_CHAR_ARRAY_FIELD
(
tableAlias
);
COPY_CHAR_ARRAY_FIELD
(
tableAlias
);
COPY_CHAR_ARRAY_FIELD
(
colName
);
COPY_CHAR_ARRAY_FIELD
(
colName
);
// CLONE_NODE_FIELD(pProjectRef);
COPY_SCALAR_FIELD
(
dataBlockId
);
COPY_SCALAR_FIELD
(
dataBlockId
);
COPY_SCALAR_FIELD
(
slotId
);
COPY_SCALAR_FIELD
(
slotId
);
return
(
SNode
*
)
pDst
;
return
(
SNode
*
)
pDst
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
f18cdd5f
...
@@ -170,6 +170,10 @@ const char* nodesNodeName(ENodeType type) {
...
@@ -170,6 +170,10 @@ const char* nodesNodeName(ENodeType type) {
return
"LogicExchange"
;
return
"LogicExchange"
;
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
"LogicWindow"
;
return
"LogicWindow"
;
case
QUERY_NODE_LOGIC_PLAN_SORT
:
return
"LogicSort"
;
case
QUERY_NODE_LOGIC_PLAN_PARTITION
:
return
"LogicPartition"
;
case
QUERY_NODE_LOGIC_SUBPLAN
:
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
"LogicSubplan"
;
return
"LogicSubplan"
;
case
QUERY_NODE_LOGIC_PLAN
:
case
QUERY_NODE_LOGIC_PLAN
:
...
@@ -530,6 +534,30 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
...
@@ -530,6 +534,30 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
return
code
;
return
code
;
}
}
static
const
char
*
jkSortLogicPlanSortKeys
=
"SortKeys"
;
static
int32_t
logicSortNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SSortLogicNode
*
pNode
=
(
const
SSortLogicNode
*
)
pObj
;
int32_t
code
=
logicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkSortLogicPlanSortKeys
,
pNode
->
pSortKeys
);
}
return
code
;
}
static
int32_t
jsonToLogicSortNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SSortLogicNode
*
pNode
=
(
SSortLogicNode
*
)
pObj
;
int32_t
code
=
jsonToLogicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkSortLogicPlanSortKeys
,
&
pNode
->
pSortKeys
);
}
return
code
;
}
static
const
char
*
jkJoinLogicPlanJoinType
=
"JoinType"
;
static
const
char
*
jkJoinLogicPlanJoinType
=
"JoinType"
;
static
const
char
*
jkJoinLogicPlanOnConditions
=
"OnConditions"
;
static
const
char
*
jkJoinLogicPlanOnConditions
=
"OnConditions"
;
...
@@ -2468,6 +2496,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
...
@@ -2468,6 +2496,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
logicProjectNodeToJson
(
pObj
,
pJson
);
return
logicProjectNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
case
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
:
break
;
case
QUERY_NODE_LOGIC_PLAN_SORT
:
return
logicSortNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_LOGIC_SUBPLAN
:
case
QUERY_NODE_LOGIC_SUBPLAN
:
case
QUERY_NODE_LOGIC_PLAN
:
case
QUERY_NODE_LOGIC_PLAN
:
break
;
break
;
...
@@ -2527,16 +2558,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
...
@@ -2527,16 +2558,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToFunctionNode
(
pJson
,
pObj
);
return
jsonToFunctionNode
(
pJson
,
pObj
);
case
QUERY_NODE_REAL_TABLE
:
case
QUERY_NODE_REAL_TABLE
:
return
jsonToRealTableNode
(
pJson
,
pObj
);
return
jsonToRealTableNode
(
pJson
,
pObj
);
// case QUERY_NODE_TEMP_TABLE:
// case QUERY_NODE_JOIN_TABLE:
// break;
// case QUERY_NODE_GROUPING_SET:
// return jsonToGroupingSetNode(pJson, pObj);
case
QUERY_NODE_ORDER_BY_EXPR
:
case
QUERY_NODE_ORDER_BY_EXPR
:
return
jsonToOrderByExprNode
(
pJson
,
pObj
);
return
jsonToOrderByExprNode
(
pJson
,
pObj
);
// case QUERY_NODE_LIMIT:
// case QUERY_NODE_STATE_WINDOW:
// case QUERY_NODE_SESSION_WINDOW:
case
QUERY_NODE_INTERVAL_WINDOW
:
case
QUERY_NODE_INTERVAL_WINDOW
:
return
jsonToIntervalWindowNode
(
pJson
,
pObj
);
return
jsonToIntervalWindowNode
(
pJson
,
pObj
);
case
QUERY_NODE_NODE_LIST
:
case
QUERY_NODE_NODE_LIST
:
...
@@ -2545,28 +2568,22 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
...
@@ -2545,28 +2568,22 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return
jsonToFillNode
(
pJson
,
pObj
);
return
jsonToFillNode
(
pJson
,
pObj
);
case
QUERY_NODE_TARGET
:
case
QUERY_NODE_TARGET
:
return
jsonToTargetNode
(
pJson
,
pObj
);
return
jsonToTargetNode
(
pJson
,
pObj
);
// case QUERY_NODE_RAW_EXPR:
// break;
case
QUERY_NODE_DATABLOCK_DESC
:
case
QUERY_NODE_DATABLOCK_DESC
:
return
jsonToDataBlockDescNode
(
pJson
,
pObj
);
return
jsonToDataBlockDescNode
(
pJson
,
pObj
);
case
QUERY_NODE_SLOT_DESC
:
case
QUERY_NODE_SLOT_DESC
:
return
jsonToSlotDescNode
(
pJson
,
pObj
);
return
jsonToSlotDescNode
(
pJson
,
pObj
);
case
QUERY_NODE_DOWNSTREAM_SOURCE
:
case
QUERY_NODE_DOWNSTREAM_SOURCE
:
return
jsonToDownstreamSourceNode
(
pJson
,
pObj
);
return
jsonToDownstreamSourceNode
(
pJson
,
pObj
);
// case QUERY_NODE_SET_OPERATOR:
// break;
case
QUERY_NODE_SELECT_STMT
:
case
QUERY_NODE_SELECT_STMT
:
return
jsonToSelectStmt
(
pJson
,
pObj
);
return
jsonToSelectStmt
(
pJson
,
pObj
);
case
QUERY_NODE_CREATE_TOPIC_STMT
:
case
QUERY_NODE_CREATE_TOPIC_STMT
:
return
jsonToCreateTopicStmt
(
pJson
,
pObj
);
return
jsonToCreateTopicStmt
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
jsonToLogicScanNode
(
pJson
,
pObj
);
return
jsonToLogicScanNode
(
pJson
,
pObj
);
// case QUERY_NODE_LOGIC_PLAN_JOIN:
// return jsonToLogicJoinNode(pJson, pObj);
// case QUERY_NODE_LOGIC_PLAN_AGG:
// return jsonToLogicAggNode(pJson, pObj);
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
jsonToLogicProjectNode
(
pJson
,
pObj
);
return
jsonToLogicProjectNode
(
pJson
,
pObj
);
case
QUERY_NODE_LOGIC_PLAN_SORT
:
return
jsonToLogicSortNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
:
return
jsonToPhysiTagScanNode
(
pJson
,
pObj
);
return
jsonToPhysiTagScanNode
(
pJson
,
pObj
);
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
:
case
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
:
...
...
source/libs/nodes/src/nodesToSQLFuncs.c
浏览文件 @
f18cdd5f
...
@@ -39,8 +39,12 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
...
@@ -39,8 +39,12 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
}
else
if
(
colNode
->
tableName
[
0
])
{
}
else
if
(
colNode
->
tableName
[
0
])
{
*
len
+=
snprintf
(
buf
+
*
len
,
bufSize
-
*
len
,
"`%s`."
,
colNode
->
tableName
);
*
len
+=
snprintf
(
buf
+
*
len
,
bufSize
-
*
len
,
"`%s`."
,
colNode
->
tableName
);
}
}
*
len
+=
snprintf
(
buf
+
*
len
,
bufSize
-
*
len
,
"`%s`"
,
colNode
->
colName
);
if
(
colNode
->
tableAlias
[
0
])
{
*
len
+=
snprintf
(
buf
+
*
len
,
bufSize
-
*
len
,
"`%s`"
,
colNode
->
colName
);
}
else
{
*
len
+=
snprintf
(
buf
+
*
len
,
bufSize
-
*
len
,
"%s"
,
colNode
->
colName
);
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
f18cdd5f
...
@@ -127,9 +127,15 @@ SNodeptr nodesMakeNode(ENodeType type) {
...
@@ -127,9 +127,15 @@ SNodeptr nodesMakeNode(ENodeType type) {
case
QUERY_NODE_DROP_INDEX_STMT
:
case
QUERY_NODE_DROP_INDEX_STMT
:
return
makeNode
(
type
,
sizeof
(
SDropIndexStmt
));
return
makeNode
(
type
,
sizeof
(
SDropIndexStmt
));
case
QUERY_NODE_CREATE_QNODE_STMT
:
case
QUERY_NODE_CREATE_QNODE_STMT
:
return
makeNode
(
type
,
sizeof
(
SCreateQnodeStmt
));
case
QUERY_NODE_CREATE_BNODE_STMT
:
case
QUERY_NODE_CREATE_SNODE_STMT
:
case
QUERY_NODE_CREATE_MNODE_STMT
:
return
makeNode
(
type
,
sizeof
(
SCreateComponentNodeStmt
));
case
QUERY_NODE_DROP_QNODE_STMT
:
case
QUERY_NODE_DROP_QNODE_STMT
:
return
makeNode
(
type
,
sizeof
(
SDropQnodeStmt
));
case
QUERY_NODE_DROP_BNODE_STMT
:
case
QUERY_NODE_DROP_SNODE_STMT
:
case
QUERY_NODE_DROP_MNODE_STMT
:
return
makeNode
(
type
,
sizeof
(
SDropComponentNodeStmt
));
case
QUERY_NODE_CREATE_TOPIC_STMT
:
case
QUERY_NODE_CREATE_TOPIC_STMT
:
return
makeNode
(
type
,
sizeof
(
SCreateTopicStmt
));
return
makeNode
(
type
,
sizeof
(
SCreateTopicStmt
));
case
QUERY_NODE_DROP_TOPIC_STMT
:
case
QUERY_NODE_DROP_TOPIC_STMT
:
...
@@ -991,12 +997,18 @@ typedef struct SCollectColumnsCxt {
...
@@ -991,12 +997,18 @@ typedef struct SCollectColumnsCxt {
int32_t
errCode
;
int32_t
errCode
;
const
char
*
pTableAlias
;
const
char
*
pTableAlias
;
SNodeList
*
pCols
;
SNodeList
*
pCols
;
SHashObj
*
pCol
Id
Hash
;
SHashObj
*
pColHash
;
}
SCollectColumnsCxt
;
}
SCollectColumnsCxt
;
static
EDealRes
doCollect
(
SCollectColumnsCxt
*
pCxt
,
int32_t
id
,
SNode
*
pNode
)
{
static
EDealRes
doCollect
(
SCollectColumnsCxt
*
pCxt
,
SColumnNode
*
pCol
,
SNode
*
pNode
)
{
if
(
NULL
==
taosHashGet
(
pCxt
->
pColIdHash
,
&
id
,
sizeof
(
id
)))
{
char
name
[
TSDB_TABLE_NAME_LEN
+
TSDB_COL_NAME_LEN
];
pCxt
->
errCode
=
taosHashPut
(
pCxt
->
pColIdHash
,
&
id
,
sizeof
(
id
),
NULL
,
0
);
int32_t
len
=
0
;
if
(
'\0'
==
pCol
->
tableAlias
[
0
])
{
len
=
sprintf
(
name
,
"%s"
,
pCol
->
colName
);
}
len
=
sprintf
(
name
,
"%s.%s"
,
pCol
->
tableAlias
,
pCol
->
colName
);
if
(
NULL
==
taosHashGet
(
pCxt
->
pColHash
,
name
,
len
))
{
pCxt
->
errCode
=
taosHashPut
(
pCxt
->
pColHash
,
name
,
len
,
NULL
,
0
);
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
)
{
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
)
{
pCxt
->
errCode
=
nodesListAppend
(
pCxt
->
pCols
,
pNode
);
pCxt
->
errCode
=
nodesListAppend
(
pCxt
->
pCols
,
pNode
);
}
}
...
@@ -1009,9 +1021,8 @@ static EDealRes collectColumns(SNode* pNode, void* pContext) {
...
@@ -1009,9 +1021,8 @@ static EDealRes collectColumns(SNode* pNode, void* pContext) {
SCollectColumnsCxt
*
pCxt
=
(
SCollectColumnsCxt
*
)
pContext
;
SCollectColumnsCxt
*
pCxt
=
(
SCollectColumnsCxt
*
)
pContext
;
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
int32_t
colId
=
pCol
->
colId
;
if
(
NULL
==
pCxt
->
pTableAlias
||
0
==
strcmp
(
pCxt
->
pTableAlias
,
pCol
->
tableAlias
))
{
if
(
NULL
==
pCxt
->
pTableAlias
||
0
==
strcmp
(
pCxt
->
pTableAlias
,
pCol
->
tableAlias
))
{
return
doCollect
(
pCxt
,
colId
,
pNode
);
return
doCollect
(
pCxt
,
pCol
,
pNode
);
}
}
}
}
return
DEAL_RES_CONTINUE
;
return
DEAL_RES_CONTINUE
;
...
@@ -1026,14 +1037,14 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
...
@@ -1026,14 +1037,14 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char*
.
errCode
=
TSDB_CODE_SUCCESS
,
.
errCode
=
TSDB_CODE_SUCCESS
,
.
pTableAlias
=
pTableAlias
,
.
pTableAlias
=
pTableAlias
,
.
pCols
=
nodesMakeList
(),
.
pCols
=
nodesMakeList
(),
.
pCol
IdHash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
)
.
pCol
Hash
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
)
};
};
if
(
NULL
==
cxt
.
pCols
||
NULL
==
cxt
.
pCol
Id
Hash
)
{
if
(
NULL
==
cxt
.
pCols
||
NULL
==
cxt
.
pColHash
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
nodesWalkSelectStmt
(
pSelect
,
clause
,
collectColumns
,
&
cxt
);
nodesWalkSelectStmt
(
pSelect
,
clause
,
collectColumns
,
&
cxt
);
taosHashCleanup
(
cxt
.
pCol
Id
Hash
);
taosHashCleanup
(
cxt
.
pColHash
);
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
if
(
TSDB_CODE_SUCCESS
!=
cxt
.
errCode
)
{
nodesClearList
(
cxt
.
pCols
);
nodesClearList
(
cxt
.
pCols
);
return
cxt
.
errCode
;
return
cxt
.
errCode
;
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
f18cdd5f
...
@@ -150,8 +150,8 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
...
@@ -150,8 +150,8 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
SNode
*
createCreateIndexStmt
(
SAstCreateContext
*
pCxt
,
EIndexType
type
,
bool
ignoreExists
,
SToken
*
pIndexName
,
SToken
*
pTableName
,
SNodeList
*
pCols
,
SNode
*
pOptions
);
SNode
*
createCreateIndexStmt
(
SAstCreateContext
*
pCxt
,
EIndexType
type
,
bool
ignoreExists
,
SToken
*
pIndexName
,
SToken
*
pTableName
,
SNodeList
*
pCols
,
SNode
*
pOptions
);
SNode
*
createIndexOption
(
SAstCreateContext
*
pCxt
,
SNodeList
*
pFuncs
,
SNode
*
pInterval
,
SNode
*
pOffset
,
SNode
*
pSliding
);
SNode
*
createIndexOption
(
SAstCreateContext
*
pCxt
,
SNodeList
*
pFuncs
,
SNode
*
pInterval
,
SNode
*
pOffset
,
SNode
*
pSliding
);
SNode
*
createDropIndexStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
SToken
*
pIndexName
,
SToken
*
pTableName
);
SNode
*
createDropIndexStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
SToken
*
pIndexName
,
SToken
*
pTableName
);
SNode
*
createCreate
QnodeStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pDnodeId
);
SNode
*
createCreate
ComponentNodeStmt
(
SAstCreateContext
*
pCxt
,
ENodeType
type
,
const
SToken
*
pDnodeId
);
SNode
*
createDrop
QnodeStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pDnodeId
);
SNode
*
createDrop
ComponentNodeStmt
(
SAstCreateContext
*
pCxt
,
ENodeType
type
,
const
SToken
*
pDnodeId
);
SNode
*
createCreateTopicStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pTopicName
,
SNode
*
pQuery
,
const
SToken
*
pSubscribeDbName
);
SNode
*
createCreateTopicStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pTopicName
,
SNode
*
pQuery
,
const
SToken
*
pSubscribeDbName
);
SNode
*
createDropTopicStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pTopicName
);
SNode
*
createDropTopicStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pTopicName
);
SNode
*
createAlterLocalStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pConfig
,
const
SToken
*
pValue
);
SNode
*
createAlterLocalStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pConfig
,
const
SToken
*
pValue
);
...
...
source/libs/parser/inc/sql.y
浏览文件 @
f18cdd5f
...
@@ -109,8 +109,20 @@ cmd ::= ALTER LOCAL NK_STRING(A).
...
@@ -109,8 +109,20 @@ cmd ::= ALTER LOCAL NK_STRING(A).
cmd ::= ALTER LOCAL NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, &B); }
cmd ::= ALTER LOCAL NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, &B); }
/************************************************ create/drop qnode ***************************************************/
/************************************************ create/drop qnode ***************************************************/
cmd ::= CREATE QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateQnodeStmt(pCxt, &A); }
cmd ::= CREATE QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_QNODE_STMT, &A); }
cmd ::= DROP QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropQnodeStmt(pCxt, &A); }
cmd ::= DROP QNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropComponentNodeStmt(pCxt, QUERY_NODE_DROP_QNODE_STMT, &A); }
/************************************************ create/drop bnode ***************************************************/
cmd ::= CREATE BNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_BNODE_STMT, &A); }
cmd ::= DROP BNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropComponentNodeStmt(pCxt, QUERY_NODE_DROP_BNODE_STMT, &A); }
/************************************************ create/drop snode ***************************************************/
cmd ::= CREATE SNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_SNODE_STMT, &A); }
cmd ::= DROP SNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropComponentNodeStmt(pCxt, QUERY_NODE_DROP_SNODE_STMT, &A); }
/************************************************ create/drop mnode ***************************************************/
cmd ::= CREATE MNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createCreateComponentNodeStmt(pCxt, QUERY_NODE_CREATE_MNODE_STMT, &A); }
cmd ::= DROP MNODE ON DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropComponentNodeStmt(pCxt, QUERY_NODE_DROP_MNODE_STMT, &A); }
/************************************************ create/drop/show/use database ***************************************/
/************************************************ create/drop/show/use database ***************************************/
cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C); }
cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C); }
...
@@ -327,6 +339,8 @@ cmd ::= SHOW QUERIES.
...
@@ -327,6 +339,8 @@ cmd ::= SHOW QUERIES.
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT, NULL, NULL); }
cmd ::= SHOW SCORES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SCORES_STMT, NULL, NULL); }
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT, NULL, NULL); }
cmd ::= SHOW TOPICS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TOPICS_STMT, NULL, NULL); }
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT, NULL, NULL); }
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT, NULL, NULL); }
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
...
@@ -714,10 +728,7 @@ select_list(A) ::= select_sublist(B).
...
@@ -714,10 +728,7 @@ select_list(A) ::= select_sublist(B).
select_sublist(A) ::= select_item(B). { A = createNodeList(pCxt, B); }
select_sublist(A) ::= select_item(B). { A = createNodeList(pCxt, B); }
select_sublist(A) ::= select_sublist(B) NK_COMMA select_item(C). { A = addNodeToList(pCxt, B, C); }
select_sublist(A) ::= select_sublist(B) NK_COMMA select_item(C). { A = addNodeToList(pCxt, B, C); }
select_item(A) ::= common_expression(B). {
select_item(A) ::= common_expression(B). { A = releaseRawExprNode(pCxt, B); }
SToken t = getTokenFromRawExprNode(pCxt, B);
A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &t);
}
select_item(A) ::= common_expression(B) column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
select_item(A) ::= common_expression(B) column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
select_item(A) ::= common_expression(B) AS column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
select_item(A) ::= common_expression(B) AS column_alias(C). { A = setProjectionAlias(pCxt, releaseRawExprNode(pCxt, B), &C); }
select_item(A) ::= table_name(B) NK_DOT NK_STAR(C). { A = createColumnNode(pCxt, &B, &C); }
select_item(A) ::= table_name(B) NK_DOT NK_STAR(C). { A = createColumnNode(pCxt, &B, &C); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
f18cdd5f
...
@@ -205,9 +205,11 @@ SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const
...
@@ -205,9 +205,11 @@ SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const
SNode
*
releaseRawExprNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
)
{
SNode
*
releaseRawExprNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
)
{
CHECK_RAW_EXPR_NODE
(
pNode
);
CHECK_RAW_EXPR_NODE
(
pNode
);
SNode
*
tmp
=
((
SRawExprNode
*
)
pNode
)
->
pNode
;
SRawExprNode
*
pRawExpr
=
(
SRawExprNode
*
)
pNode
;
SNode
*
pExpr
=
pRawExpr
->
pNode
;
strncpy
(((
SExprNode
*
)
pExpr
)
->
aliasName
,
pRawExpr
->
p
,
pRawExpr
->
n
);
taosMemoryFreeClear
(
pNode
);
taosMemoryFreeClear
(
pNode
);
return
tmp
;
return
pExpr
;
}
}
SToken
getTokenFromRawExprNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
)
{
SToken
getTokenFromRawExprNode
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
)
{
...
@@ -1034,15 +1036,15 @@ SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken
...
@@ -1034,15 +1036,15 @@ SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken
return
(
SNode
*
)
pStmt
;
return
(
SNode
*
)
pStmt
;
}
}
SNode
*
createCreate
QnodeStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pDnodeId
)
{
SNode
*
createCreate
ComponentNodeStmt
(
SAstCreateContext
*
pCxt
,
ENodeType
type
,
const
SToken
*
pDnodeId
)
{
SCreate
QnodeStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_CREATE_QNODE_STMT
);
SCreate
ComponentNodeStmt
*
pStmt
=
nodesMakeNode
(
type
);
CHECK_OUT_OF_MEM
(
pStmt
);
CHECK_OUT_OF_MEM
(
pStmt
);
pStmt
->
dnodeId
=
strtol
(
pDnodeId
->
z
,
NULL
,
10
);;
pStmt
->
dnodeId
=
strtol
(
pDnodeId
->
z
,
NULL
,
10
);;
return
(
SNode
*
)
pStmt
;
return
(
SNode
*
)
pStmt
;
}
}
SNode
*
createDrop
QnodeStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pDnodeId
)
{
SNode
*
createDrop
ComponentNodeStmt
(
SAstCreateContext
*
pCxt
,
ENodeType
type
,
const
SToken
*
pDnodeId
)
{
SDrop
QnodeStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_DROP_QNODE_STMT
);
SDrop
ComponentNodeStmt
*
pStmt
=
nodesMakeNode
(
type
);
CHECK_OUT_OF_MEM
(
pStmt
);
CHECK_OUT_OF_MEM
(
pStmt
);
pStmt
->
dnodeId
=
strtol
(
pDnodeId
->
z
,
NULL
,
10
);;
pStmt
->
dnodeId
=
strtol
(
pDnodeId
->
z
,
NULL
,
10
);;
return
(
SNode
*
)
pStmt
;
return
(
SNode
*
)
pStmt
;
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
f18cdd5f
...
@@ -43,6 +43,8 @@ static SKeyword keywordTable[] = {
...
@@ -43,6 +43,8 @@ static SKeyword keywordTable[] = {
{
"BINARY"
,
TK_BINARY
},
{
"BINARY"
,
TK_BINARY
},
{
"BIGINT"
,
TK_BIGINT
},
{
"BIGINT"
,
TK_BIGINT
},
{
"BLOCKS"
,
TK_BLOCKS
},
{
"BLOCKS"
,
TK_BLOCKS
},
{
"BNODE"
,
TK_BNODE
},
{
"BNODES"
,
TK_BNODES
},
{
"BOOL"
,
TK_BOOL
},
{
"BOOL"
,
TK_BOOL
},
{
"BUFSIZE"
,
TK_BUFSIZE
},
{
"BUFSIZE"
,
TK_BUFSIZE
},
{
"BY"
,
TK_BY
},
{
"BY"
,
TK_BY
},
...
@@ -106,6 +108,7 @@ static SKeyword keywordTable[] = {
...
@@ -106,6 +108,7 @@ static SKeyword keywordTable[] = {
{
"MAXROWS"
,
TK_MAXROWS
},
{
"MAXROWS"
,
TK_MAXROWS
},
{
"MINROWS"
,
TK_MINROWS
},
{
"MINROWS"
,
TK_MINROWS
},
{
"MINUS"
,
TK_MINUS
},
{
"MINUS"
,
TK_MINUS
},
{
"MNODE"
,
TK_MNODE
},
{
"MNODES"
,
TK_MNODES
},
{
"MNODES"
,
TK_MNODES
},
{
"MODIFY"
,
TK_MODIFY
},
{
"MODIFY"
,
TK_MODIFY
},
{
"MODULES"
,
TK_MODULES
},
{
"MODULES"
,
TK_MODULES
},
...
@@ -152,6 +155,8 @@ static SKeyword keywordTable[] = {
...
@@ -152,6 +155,8 @@ static SKeyword keywordTable[] = {
{
"SLIMIT"
,
TK_SLIMIT
},
{
"SLIMIT"
,
TK_SLIMIT
},
{
"SMA"
,
TK_SMA
},
{
"SMA"
,
TK_SMA
},
{
"SMALLINT"
,
TK_SMALLINT
},
{
"SMALLINT"
,
TK_SMALLINT
},
{
"SNODE"
,
TK_SNODE
},
{
"SNODES"
,
TK_SNODES
},
{
"SOFFSET"
,
TK_SOFFSET
},
{
"SOFFSET"
,
TK_SOFFSET
},
{
"STABLE"
,
TK_STABLE
},
{
"STABLE"
,
TK_STABLE
},
{
"STABLES"
,
TK_STABLES
},
{
"STABLES"
,
TK_STABLES
},
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
f18cdd5f
...
@@ -2034,7 +2034,23 @@ static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt
...
@@ -2034,7 +2034,23 @@ static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
translateCreateQnode
(
STranslateContext
*
pCxt
,
SCreateQnodeStmt
*
pStmt
)
{
static
int16_t
getCreateComponentNodeMsgType
(
ENodeType
type
)
{
switch
(
type
)
{
case
QUERY_NODE_CREATE_QNODE_STMT
:
return
TDMT_MND_CREATE_QNODE
;
case
QUERY_NODE_CREATE_BNODE_STMT
:
return
TDMT_MND_CREATE_BNODE
;
case
QUERY_NODE_CREATE_SNODE_STMT
:
return
TDMT_MND_CREATE_SNODE
;
case
QUERY_NODE_CREATE_MNODE_STMT
:
return
TDMT_MND_CREATE_MNODE
;
default:
break
;
}
return
-
1
;
}
static
int32_t
translateCreateComponentNode
(
STranslateContext
*
pCxt
,
SCreateComponentNodeStmt
*
pStmt
)
{
SMCreateQnodeReq
createReq
=
{
.
dnodeId
=
pStmt
->
dnodeId
};
SMCreateQnodeReq
createReq
=
{
.
dnodeId
=
pStmt
->
dnodeId
};
pCxt
->
pCmdMsg
=
taosMemoryMalloc
(
sizeof
(
SCmdMsgInfo
));
pCxt
->
pCmdMsg
=
taosMemoryMalloc
(
sizeof
(
SCmdMsgInfo
));
...
@@ -2042,8 +2058,8 @@ static int32_t translateCreateQnode(STranslateContext* pCxt, SCreateQnodeStmt* p
...
@@ -2042,8 +2058,8 @@ static int32_t translateCreateQnode(STranslateContext* pCxt, SCreateQnodeStmt* p
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
pCxt
->
pCmdMsg
->
epSet
=
pCxt
->
pParseCxt
->
mgmtEpSet
;
pCxt
->
pCmdMsg
->
epSet
=
pCxt
->
pParseCxt
->
mgmtEpSet
;
pCxt
->
pCmdMsg
->
msgType
=
TDMT_MND_CREATE_QNODE
;
pCxt
->
pCmdMsg
->
msgType
=
getCreateComponentNodeMsgType
(
nodeType
(
pStmt
))
;
pCxt
->
pCmdMsg
->
msgLen
=
tSerializeS
CreateDropM
QSBNodeReq
(
NULL
,
0
,
&
createReq
);
pCxt
->
pCmdMsg
->
msgLen
=
tSerializeS
MCreateDrop
QSBNodeReq
(
NULL
,
0
,
&
createReq
);
pCxt
->
pCmdMsg
->
pMsg
=
taosMemoryMalloc
(
pCxt
->
pCmdMsg
->
msgLen
);
pCxt
->
pCmdMsg
->
pMsg
=
taosMemoryMalloc
(
pCxt
->
pCmdMsg
->
msgLen
);
if
(
NULL
==
pCxt
->
pCmdMsg
->
pMsg
)
{
if
(
NULL
==
pCxt
->
pCmdMsg
->
pMsg
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -2053,7 +2069,23 @@ static int32_t translateCreateQnode(STranslateContext* pCxt, SCreateQnodeStmt* p
...
@@ -2053,7 +2069,23 @@ static int32_t translateCreateQnode(STranslateContext* pCxt, SCreateQnodeStmt* p
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
translateDropQnode
(
STranslateContext
*
pCxt
,
SDropQnodeStmt
*
pStmt
)
{
static
int16_t
getDropComponentNodeMsgType
(
ENodeType
type
)
{
switch
(
type
)
{
case
QUERY_NODE_DROP_QNODE_STMT
:
return
TDMT_MND_DROP_QNODE
;
case
QUERY_NODE_DROP_BNODE_STMT
:
return
TDMT_MND_DROP_BNODE
;
case
QUERY_NODE_DROP_SNODE_STMT
:
return
TDMT_MND_DROP_SNODE
;
case
QUERY_NODE_DROP_MNODE_STMT
:
return
TDMT_MND_DROP_MNODE
;
default:
break
;
}
return
-
1
;
}
static
int32_t
translateDropComponentNode
(
STranslateContext
*
pCxt
,
SDropComponentNodeStmt
*
pStmt
)
{
SDDropQnodeReq
dropReq
=
{
.
dnodeId
=
pStmt
->
dnodeId
};
SDDropQnodeReq
dropReq
=
{
.
dnodeId
=
pStmt
->
dnodeId
};
pCxt
->
pCmdMsg
=
taosMemoryMalloc
(
sizeof
(
SCmdMsgInfo
));
pCxt
->
pCmdMsg
=
taosMemoryMalloc
(
sizeof
(
SCmdMsgInfo
));
...
@@ -2061,8 +2093,8 @@ static int32_t translateDropQnode(STranslateContext* pCxt, SDropQnodeStmt* pStmt
...
@@ -2061,8 +2093,8 @@ static int32_t translateDropQnode(STranslateContext* pCxt, SDropQnodeStmt* pStmt
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
pCxt
->
pCmdMsg
->
epSet
=
pCxt
->
pParseCxt
->
mgmtEpSet
;
pCxt
->
pCmdMsg
->
epSet
=
pCxt
->
pParseCxt
->
mgmtEpSet
;
pCxt
->
pCmdMsg
->
msgType
=
TDMT_MND_DROP_QNODE
;
pCxt
->
pCmdMsg
->
msgType
=
getDropComponentNodeMsgType
(
nodeType
(
pStmt
))
;
pCxt
->
pCmdMsg
->
msgLen
=
tSerializeS
CreateDropM
QSBNodeReq
(
NULL
,
0
,
&
dropReq
);
pCxt
->
pCmdMsg
->
msgLen
=
tSerializeS
MCreateDrop
QSBNodeReq
(
NULL
,
0
,
&
dropReq
);
pCxt
->
pCmdMsg
->
pMsg
=
taosMemoryMalloc
(
pCxt
->
pCmdMsg
->
msgLen
);
pCxt
->
pCmdMsg
->
pMsg
=
taosMemoryMalloc
(
pCxt
->
pCmdMsg
->
msgLen
);
if
(
NULL
==
pCxt
->
pCmdMsg
->
pMsg
)
{
if
(
NULL
==
pCxt
->
pCmdMsg
->
pMsg
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -2226,10 +2258,16 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
...
@@ -2226,10 +2258,16 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
code
=
translateDropIndex
(
pCxt
,
(
SDropIndexStmt
*
)
pNode
);
code
=
translateDropIndex
(
pCxt
,
(
SDropIndexStmt
*
)
pNode
);
break
;
break
;
case
QUERY_NODE_CREATE_QNODE_STMT
:
case
QUERY_NODE_CREATE_QNODE_STMT
:
code
=
translateCreateQnode
(
pCxt
,
(
SCreateQnodeStmt
*
)
pNode
);
case
QUERY_NODE_CREATE_BNODE_STMT
:
case
QUERY_NODE_CREATE_SNODE_STMT
:
case
QUERY_NODE_CREATE_MNODE_STMT
:
code
=
translateCreateComponentNode
(
pCxt
,
(
SCreateComponentNodeStmt
*
)
pNode
);
break
;
break
;
case
QUERY_NODE_DROP_QNODE_STMT
:
case
QUERY_NODE_DROP_QNODE_STMT
:
code
=
translateDropQnode
(
pCxt
,
(
SDropQnodeStmt
*
)
pNode
);
case
QUERY_NODE_DROP_BNODE_STMT
:
case
QUERY_NODE_DROP_SNODE_STMT
:
case
QUERY_NODE_DROP_MNODE_STMT
:
code
=
translateDropComponentNode
(
pCxt
,
(
SDropComponentNodeStmt
*
)
pNode
);
break
;
break
;
case
QUERY_NODE_CREATE_TOPIC_STMT
:
case
QUERY_NODE_CREATE_TOPIC_STMT
:
code
=
translateCreateTopic
(
pCxt
,
(
SCreateTopicStmt
*
)
pNode
);
code
=
translateCreateTopic
(
pCxt
,
(
SCreateTopicStmt
*
)
pNode
);
...
@@ -2385,6 +2423,10 @@ static const char* getSysTableName(ENodeType type) {
...
@@ -2385,6 +2423,10 @@ static const char* getSysTableName(ENodeType type) {
return
TSDB_INS_TABLE_USER_INDEXES
;
return
TSDB_INS_TABLE_USER_INDEXES
;
case
QUERY_NODE_SHOW_STREAMS_STMT
:
case
QUERY_NODE_SHOW_STREAMS_STMT
:
return
TSDB_INS_TABLE_USER_STREAMS
;
return
TSDB_INS_TABLE_USER_STREAMS
;
case
QUERY_NODE_SHOW_BNODES_STMT
:
return
TSDB_INS_TABLE_BNODES
;
case
QUERY_NODE_SHOW_SNODES_STMT
:
return
TSDB_INS_TABLE_SNODES
;
default:
default:
break
;
break
;
}
}
...
@@ -2898,6 +2940,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
...
@@ -2898,6 +2940,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case
QUERY_NODE_SHOW_FUNCTIONS_STMT
:
case
QUERY_NODE_SHOW_FUNCTIONS_STMT
:
case
QUERY_NODE_SHOW_INDEXES_STMT
:
case
QUERY_NODE_SHOW_INDEXES_STMT
:
case
QUERY_NODE_SHOW_STREAMS_STMT
:
case
QUERY_NODE_SHOW_STREAMS_STMT
:
case
QUERY_NODE_SHOW_BNODES_STMT
:
case
QUERY_NODE_SHOW_SNODES_STMT
:
code
=
rewriteShow
(
pCxt
,
pQuery
);
code
=
rewriteShow
(
pCxt
,
pQuery
);
break
;
break
;
case
QUERY_NODE_CREATE_TABLE_STMT
:
case
QUERY_NODE_CREATE_TABLE_STMT
:
...
...
source/libs/parser/src/sql.c
浏览文件 @
f18cdd5f
此差异已折叠。
点击以展开。
source/libs/parser/test/parserAstTest.cpp
浏览文件 @
f18cdd5f
...
@@ -648,6 +648,48 @@ TEST_F(ParserTest, dropQnode) {
...
@@ -648,6 +648,48 @@ TEST_F(ParserTest, dropQnode) {
ASSERT_TRUE
(
run
());
ASSERT_TRUE
(
run
());
}
}
TEST_F
(
ParserTest
,
createBnode
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"create bnode on dnode 1"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
dropBnode
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"drop bnode on dnode 1"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
createSnode
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"create snode on dnode 1"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
dropSnode
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"drop snode on dnode 1"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
createMnode
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"create mnode on dnode 1"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
dropMnode
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"drop mnode on dnode 1"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
createTopic
)
{
TEST_F
(
ParserTest
,
createTopic
)
{
setDatabase
(
"root"
,
"test"
);
setDatabase
(
"root"
,
"test"
);
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
f18cdd5f
...
@@ -72,7 +72,9 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
...
@@ -72,7 +72,9 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
case
QUERY_NODE_OPERATOR
:
case
QUERY_NODE_OPERATOR
:
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_FUNCTION
:
{
case
QUERY_NODE_FUNCTION
:
{
sprintf
(((
SExprNode
*
)
pNode
)
->
aliasName
,
"#expr_%p"
,
pNode
);
if
(
'\0'
==
((
SExprNode
*
)
pNode
)
->
aliasName
[
0
])
{
sprintf
(((
SExprNode
*
)
pNode
)
->
aliasName
,
"#expr_%p"
,
pNode
);
}
return
DEAL_RES_IGNORE_CHILD
;
return
DEAL_RES_IGNORE_CHILD
;
}
}
default:
default:
...
...
source/libs/planner/test/plannerTest.cpp
浏览文件 @
f18cdd5f
...
@@ -259,6 +259,16 @@ TEST_F(PlannerTest, orderBy) {
...
@@ -259,6 +259,16 @@ TEST_F(PlannerTest, orderBy) {
ASSERT_TRUE
(
run
());
ASSERT_TRUE
(
run
());
}
}
TEST_F
(
PlannerTest
,
groupByOrderBy
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"select count(*), sum(c1) from t1 order by sum(c1)"
);
ASSERT_TRUE
(
run
());
bind
(
"select count(*), sum(c1) a from t1 order by a"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
PlannerTest
,
distinct
)
{
TEST_F
(
PlannerTest
,
distinct
)
{
setDatabase
(
"root"
,
"test"
);
setDatabase
(
"root"
,
"test"
);
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
f18cdd5f
...
@@ -247,14 +247,50 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
...
@@ -247,14 +247,50 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
}
}
bool
value
=
0
;
bool
value
=
0
;
GET_TYPED_DATA
(
value
,
int64_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
GET_TYPED_DATA
(
value
,
bool
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt8
(
pOutputCol
,
i
,
(
int8_t
*
)
&
value
);
colDataAppendInt8
(
pOutputCol
,
i
,
(
int8_t
*
)
&
value
);
}
break
;
}
case
TSDB_DATA_TYPE_TINYINT
:
{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
}
int8_t
value
=
0
;
GET_TYPED_DATA
(
value
,
int8_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt8
(
pOutputCol
,
i
,
(
int8_t
*
)
&
value
);
}
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
}
int16_t
value
=
0
;
GET_TYPED_DATA
(
value
,
int16_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt16
(
pOutputCol
,
i
,
(
int16_t
*
)
&
value
);
}
break
;
}
case
TSDB_DATA_TYPE_INT
:{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
}
int32_t
value
=
0
;
GET_TYPED_DATA
(
value
,
int32_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt32
(
pOutputCol
,
i
,
(
int32_t
*
)
&
value
);
}
}
break
;
break
;
}
}
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
...
@@ -265,14 +301,50 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
...
@@ -265,14 +301,50 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
int64_t
value
=
0
;
int64_t
value
=
0
;
GET_TYPED_DATA
(
value
,
int64_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
GET_TYPED_DATA
(
value
,
int64_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt64
(
pOutputCol
,
i
,
&
value
);
colDataAppendInt64
(
pOutputCol
,
i
,
(
int64_t
*
)
&
value
);
}
}
break
;
break
;
}
}
case
TSDB_DATA_TYPE_UTINYINT
:
case
TSDB_DATA_TYPE_UTINYINT
:{
case
TSDB_DATA_TYPE_USMALLINT
:
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
case
TSDB_DATA_TYPE_UINT
:
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
case
TSDB_DATA_TYPE_UBIGINT
:
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
}
uint8_t
value
=
0
;
GET_TYPED_DATA
(
value
,
uint8_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt8
(
pOutputCol
,
i
,
(
int8_t
*
)
&
value
);
}
break
;
}
case
TSDB_DATA_TYPE_USMALLINT
:{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
}
uint16_t
value
=
0
;
GET_TYPED_DATA
(
value
,
uint16_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt16
(
pOutputCol
,
i
,
(
int16_t
*
)
&
value
);
}
break
;
}
case
TSDB_DATA_TYPE_UINT
:{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
}
uint32_t
value
=
0
;
GET_TYPED_DATA
(
value
,
uint32_t
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendInt32
(
pOutputCol
,
i
,
(
int32_t
*
)
&
value
);
}
break
;
}
case
TSDB_DATA_TYPE_UBIGINT
:
{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
colDataAppendNULL
(
pOutputCol
,
i
);
...
@@ -284,8 +356,21 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
...
@@ -284,8 +356,21 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
colDataAppendInt64
(
pOutputCol
,
i
,
(
int64_t
*
)
&
value
);
colDataAppendInt64
(
pOutputCol
,
i
,
(
int64_t
*
)
&
value
);
}
}
break
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
}
case
TSDB_DATA_TYPE_DOUBLE
:
case
TSDB_DATA_TYPE_FLOAT
:{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
continue
;
}
float
value
=
0
;
GET_TYPED_DATA
(
value
,
float
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendFloat
(
pOutputCol
,
i
,
(
float
*
)
&
value
);
}
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pIn
->
numOfRows
;
++
i
)
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
if
(
colDataIsNull_f
(
pInputCol
->
nullbitmap
,
i
))
{
colDataAppendNULL
(
pOutputCol
,
i
);
colDataAppendNULL
(
pOutputCol
,
i
);
...
@@ -294,9 +379,10 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
...
@@ -294,9 +379,10 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
double
value
=
0
;
double
value
=
0
;
GET_TYPED_DATA
(
value
,
double
,
inType
,
colDataGetData
(
pInputCol
,
i
));
GET_TYPED_DATA
(
value
,
double
,
inType
,
colDataGetData
(
pInputCol
,
i
));
colDataAppendDouble
(
pOutputCol
,
i
,
&
value
);
colDataAppendDouble
(
pOutputCol
,
i
,
(
double
*
)
&
value
);
}
}
break
;
break
;
}
default:
default:
sclError
(
"invalid convert output type:%d"
,
outType
);
sclError
(
"invalid convert output type:%d"
,
outType
);
return
TSDB_CODE_QRY_APP_ERROR
;
return
TSDB_CODE_QRY_APP_ERROR
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录