Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
395dedaa
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
395dedaa
编写于
12月 01, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/TD-20798
上级
1e8a8161
b6f1ce20
变更
39
展开全部
隐藏空白更改
内联
并排
Showing
39 changed file
with
5608 addition
and
5523 deletion
+5608
-5523
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
include/common/systable.h
include/common/systable.h
+1
-0
include/common/tglobal.h
include/common/tglobal.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+13
-11
include/common/ttokendef.h
include/common/ttokendef.h
+278
-276
include/libs/executor/executor.h
include/libs/executor/executor.h
+0
-7
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+5
-4
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
source/client/src/clientSml.c
source/client/src/clientSml.c
+38
-25
source/common/src/systable.c
source/common/src/systable.c
+7
-0
source/common/src/tglobal.c
source/common/src/tglobal.c
+5
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-2
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+16
-16
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+2
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-22
source/libs/executor/inc/tfill.h
source/libs/executor/inc/tfill.h
+10
-6
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+0
-11
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+10
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+14
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+0
-349
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+1494
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+0
-24
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+2
-1111
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+1
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+1
-11
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+3
-1
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+3
-1
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+3
-0
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+2
-2
source/libs/parser/src/parAstParser.c
source/libs/parser/src/parAstParser.c
+7
-0
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+2
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+14
-3
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+3465
-3478
source/libs/parser/test/mockCatalog.cpp
source/libs/parser/test/mockCatalog.cpp
+6
-2
source/libs/parser/test/parAlterToBalanceTest.cpp
source/libs/parser/test/parAlterToBalanceTest.cpp
+2
-2
source/libs/parser/test/parExplainToSyncdbTest.cpp
source/libs/parser/test/parExplainToSyncdbTest.cpp
+28
-0
source/libs/parser/test/parShowToUse.cpp
source/libs/parser/test/parShowToUse.cpp
+7
-1
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-1
tests/system-test/0-others/compatibility.py
tests/system-test/0-others/compatibility.py
+161
-155
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
395dedaa
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
5445810
GIT_TAG
d5df76d
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
include/common/systable.h
浏览文件 @
395dedaa
...
...
@@ -47,6 +47,7 @@ extern "C" {
#define TSDB_INS_TABLE_TOPICS "ins_topics"
#define TSDB_INS_TABLE_STREAMS "ins_streams"
#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks"
#define TSDB_INS_TABLE_USER_PRIVILEGES "ins_user_privileges"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "perf_smas"
...
...
include/common/tglobal.h
浏览文件 @
395dedaa
...
...
@@ -129,6 +129,7 @@ extern char tsUdfdLdLibPath[];
extern
char
tsSmlChildTableName
[];
extern
char
tsSmlTagName
[];
extern
bool
tsSmlDataFormat
;
extern
int32_t
tsSmlBatchSize
;
// wal
extern
int64_t
tsWalFsyncDataSizeLimit
;
...
...
include/common/tmsg.h
浏览文件 @
395dedaa
...
...
@@ -141,16 +141,18 @@ typedef enum _mgmt_table {
#define TSDB_FILL_PREV 4
#define TSDB_FILL_NEXT 5
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_SUPERUSER 0x2
#define TSDB_ALTER_USER_ADD_READ_DB 0x3
#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6
#define TSDB_ALTER_USER_ADD_ALL_DB 0x7
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8
#define TSDB_ALTER_USER_ENABLE 0x9
#define TSDB_ALTER_USER_SYSINFO 0xA
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_SUPERUSER 0x2
#define TSDB_ALTER_USER_ADD_READ_DB 0x3
#define TSDB_ALTER_USER_REMOVE_READ_DB 0x4
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x5
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x6
#define TSDB_ALTER_USER_ADD_ALL_DB 0x7
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0x8
#define TSDB_ALTER_USER_ENABLE 0x9
#define TSDB_ALTER_USER_SYSINFO 0xA
#define TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC 0xB
#define TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC 0xC
#define TSDB_ALTER_USER_PRIVILEGES 0x2
...
...
@@ -620,7 +622,7 @@ typedef struct {
int8_t
enable
;
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_USET_PASSWORD_LEN
];
char
dbname
[
TSDB_DB_FNAME_LEN
];
char
objname
[
TSDB_DB_FNAME_LEN
];
// db or topic
}
SAlterUserReq
;
int32_t
tSerializeSAlterUserReq
(
void
*
buf
,
int32_t
bufLen
,
SAlterUserReq
*
pReq
);
...
...
include/common/ttokendef.h
浏览文件 @
395dedaa
此差异已折叠。
点击以展开。
include/libs/executor/executor.h
浏览文件 @
395dedaa
...
...
@@ -160,13 +160,6 @@ int32_t qAsyncKillTask(qTaskInfo_t tinfo);
*/
void
qDestroyTask
(
qTaskInfo_t
tinfo
);
/**
* Get the queried table uid
* @param qHandle
* @return
*/
int64_t
qGetQueriedTableUid
(
qTaskInfo_t
tinfo
);
/**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
*
...
...
include/libs/nodes/cmdnodes.h
浏览文件 @
395dedaa
...
...
@@ -42,9 +42,10 @@ extern "C" {
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_SUBSCRIBE PRIVILEGE_TYPE_MASK(3)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
...
...
@@ -423,7 +424,7 @@ typedef struct SDropFunctionStmt {
typedef
struct
SGrantStmt
{
ENodeType
type
;
char
userName
[
TSDB_USER_LEN
];
char
dbName
[
TSDB_DB_NAME_LEN
];
char
objName
[
TSDB_DB_NAME_LEN
];
// db or topic
int64_t
privileges
;
}
SGrantStmt
;
...
...
include/libs/nodes/nodes.h
浏览文件 @
395dedaa
...
...
@@ -187,6 +187,7 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_TRANSACTIONS_STMT
,
QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT
,
QUERY_NODE_SHOW_VNODES_STMT
,
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT
,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT
,
QUERY_NODE_SHOW_CREATE_TABLE_STMT
,
QUERY_NODE_SHOW_CREATE_STABLE_STMT
,
...
...
source/client/src/clientSml.c
浏览文件 @
395dedaa
...
...
@@ -79,7 +79,6 @@
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
#define MAX_RETRY_TIMES 5
#define LINE_BATCH 2000
//=================================================================================================
typedef
TSDB_SML_PROTOCOL_TYPE
SMLProtocolType
;
...
...
@@ -467,6 +466,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto
end
;
}
info
->
cost
.
numOfCreateSTables
++
;
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
else
if
(
code
==
TSDB_CODE_SUCCESS
)
{
hashTmp
=
taosHashInit
(
pTableMeta
->
tableInfo
.
numOfTags
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
...
...
@@ -505,16 +511,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uError
(
"SML:0x%"
PRIx64
" smlSendMetaMsg failed. can not create %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
}
taosHashClear
(
hashTmp
);
...
...
@@ -552,12 +558,18 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uError
(
"SML:0x%"
PRIx64
" smlSendMetaMsg failed. can not create %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
code
=
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
-
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
}
needCheckMeta
=
true
;
taosHashCleanup
(
hashTmp
);
hashTmp
=
NULL
;
...
...
@@ -565,13 +577,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uError
(
"SML:0x%"
PRIx64
" load table meta error: %s"
,
info
->
id
,
tstrerror
(
code
));
goto
end
;
}
taosMemoryFreeClear
(
pTableMeta
);
code
=
catalogGetSTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
pTableMeta
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" catalogGetSTableMeta failed. super table name %s"
,
info
->
id
,
pName
.
tname
);
goto
end
;
}
if
(
needCheckMeta
)
{
code
=
smlCheckMeta
(
&
(
pTableMeta
->
schema
[
pTableMeta
->
tableInfo
.
numOfColumns
]),
pTableMeta
->
tableInfo
.
numOfTags
,
...
...
@@ -596,7 +601,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
end:
taosHashCleanup
(
hashTmp
);
taosMemoryFreeClear
(
pTableMeta
);
catalogRefreshTableMeta
(
info
->
pCatalog
,
&
conn
,
&
pName
,
1
);
//
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
return
code
;
}
...
...
@@ -815,6 +820,11 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) {
}
static
int64_t
smlParseInfluxTime
(
SSmlHandle
*
info
,
const
char
*
data
,
int32_t
len
)
{
void
*
tmp
=
taosMemoryCalloc
(
1
,
len
+
1
);
memcpy
(
tmp
,
data
,
len
);
uDebug
(
"SML:0x%"
PRIx64
" smlParseInfluxTime tslen:%d, ts:%s"
,
info
->
id
,
len
,
(
char
*
)
tmp
);
taosMemoryFree
(
tmp
);
if
(
len
==
0
||
(
len
==
1
&&
data
[
0
]
==
'0'
))
{
return
taosGetTimestampNs
();
}
...
...
@@ -2066,7 +2076,10 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
static
int32_t
smlParseInfluxLine
(
SSmlHandle
*
info
,
const
char
*
sql
,
const
int
len
)
{
SSmlLineInfo
elements
=
{
0
};
uDebug
(
"SML:0x%"
PRIx64
" smlParseInfluxLine sql:%s"
,
info
->
id
,
(
info
->
isRawLine
?
"rawdata"
:
sql
));
void
*
tmp
=
taosMemoryCalloc
(
1
,
len
+
1
);
memcpy
(
tmp
,
sql
,
len
);
uDebug
(
"SML:0x%"
PRIx64
" smlParseInfluxLine raw:%d, len:%d, sql:%s"
,
info
->
id
,
info
->
isRawLine
,
len
,
(
info
->
isRawLine
?
(
char
*
)
tmp
:
sql
));
taosMemoryFree
(
tmp
);
int
ret
=
smlParseInfluxString
(
sql
,
sql
+
len
,
&
elements
,
&
info
->
msgBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2562,7 +2575,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
goto
end
;
}
batchs
=
ceil
(((
double
)
numLines
)
/
LINE_BATCH
);
batchs
=
ceil
(((
double
)
numLines
)
/
tsSmlBatchSize
);
params
.
total
=
batchs
;
for
(
int
i
=
0
;
i
<
batchs
;
++
i
)
{
SRequestObj
*
req
=
(
SRequestObj
*
)
createRequest
(
pTscObj
->
id
,
TSDB_SQL_INSERT
,
0
);
...
...
@@ -2581,7 +2594,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
info
->
isRawLine
=
(
rawLine
==
NULL
);
info
->
ttl
=
ttl
;
int32_t
perBatch
=
LINE_BATCH
;
int32_t
perBatch
=
tsSmlBatchSize
;
if
(
numLines
>
perBatch
)
{
numLines
-=
perBatch
;
...
...
source/common/src/systable.c
浏览文件 @
395dedaa
...
...
@@ -273,6 +273,12 @@ static const SSysDbTableSchema vnodesSchema[] = {
{.
name
=
"dnode_ep"
,
.
bytes
=
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
};
static
const
SSysDbTableSchema
userUserPrivilegesSchema
[]
=
{
{.
name
=
"user_name"
,
.
bytes
=
TSDB_USER_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"privilege"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"object_name"
,
.
bytes
=
TSDB_DB_NAME_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
};
static
const
SSysTableMeta
infosMeta
[]
=
{
{
TSDB_INS_TABLE_DNODES
,
dnodesSchema
,
tListLen
(
dnodesSchema
),
true
},
{
TSDB_INS_TABLE_MNODES
,
mnodesSchema
,
tListLen
(
mnodesSchema
),
true
},
...
...
@@ -297,6 +303,7 @@ static const SSysTableMeta infosMeta[] = {
{
TSDB_INS_TABLE_STREAMS
,
streamSchema
,
tListLen
(
streamSchema
),
false
},
{
TSDB_INS_TABLE_STREAM_TASKS
,
streamTaskSchema
,
tListLen
(
streamTaskSchema
),
false
},
{
TSDB_INS_TABLE_VNODES
,
vnodesSchema
,
tListLen
(
vnodesSchema
),
true
},
{
TSDB_INS_TABLE_USER_PRIVILEGES
,
userUserPrivilegesSchema
,
tListLen
(
userUserPrivilegesSchema
),
false
},
};
static
const
SSysDbTableSchema
connectionsSchema
[]
=
{
...
...
source/common/src/tglobal.c
浏览文件 @
395dedaa
...
...
@@ -75,6 +75,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
// If set to empty system will generate table name using MD5 hash.
// true means that the name and order of cols in each line are the same(only for influx protocol)
bool
tsSmlDataFormat
=
false
;
int32_t
tsSmlBatchSize
=
10000
;
// query
int32_t
tsQueryPolicy
=
1
;
...
...
@@ -306,6 +307,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddString
(
pCfg
,
"smlChildTableName"
,
""
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlTagName"
,
tsSmlTagName
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"smlDataFormat"
,
tsSmlDataFormat
,
1
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"smlBatchSize"
,
tsSmlBatchSize
,
1
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"maxMemUsedByInsert"
,
tsMaxMemUsedByInsert
,
1
,
INT32_MAX
,
true
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"rpcRetryLimit"
,
tsRpcRetryLimit
,
1
,
100000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"rpcRetryInterval"
,
tsRpcRetryInterval
,
1
,
100000
,
0
)
!=
0
)
return
-
1
;
...
...
@@ -648,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy
(
tsSmlTagName
,
cfgGetItem
(
pCfg
,
"smlTagName"
)
->
str
,
TSDB_COL_NAME_LEN
);
tsSmlDataFormat
=
cfgGetItem
(
pCfg
,
"smlDataFormat"
)
->
bval
;
tsSmlBatchSize
=
cfgGetItem
(
pCfg
,
"smlBatchSize"
)
->
i32
;
tsMaxMemUsedByInsert
=
cfgGetItem
(
pCfg
,
"maxMemUsedByInsert"
)
->
i32
;
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
...
...
@@ -1021,6 +1024,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tstrncpy
(
tsSmlTagName
,
cfgGetItem
(
pCfg
,
"smlTagName"
)
->
str
,
TSDB_COL_NAME_LEN
);
}
else
if
(
strcasecmp
(
"smlDataFormat"
,
name
)
==
0
)
{
tsSmlDataFormat
=
cfgGetItem
(
pCfg
,
"smlDataFormat"
)
->
bval
;
}
else
if
(
strcasecmp
(
"smlBatchSize"
,
name
)
==
0
)
{
tsSmlBatchSize
=
cfgGetItem
(
pCfg
,
"smlBatchSize"
)
->
i32
;
}
else
if
(
strcasecmp
(
"shellActivityTimer"
,
name
)
==
0
)
{
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
}
else
if
(
strcasecmp
(
"supportVnodes"
,
name
)
==
0
)
{
...
...
source/common/src/tmsg.c
浏览文件 @
395dedaa
...
...
@@ -1298,7 +1298,7 @@ int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq)
if
(
tEncodeI8
(
&
encoder
,
pReq
->
enable
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
user
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
pass
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
db
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
obj
name
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -1317,7 +1317,7 @@ int32_t tDeserializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
enable
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
user
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
pass
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
db
name
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
obj
name
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
395dedaa
...
...
@@ -507,14 +507,14 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
}
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_ADD_READ_DB
||
alterReq
.
alterType
==
TSDB_ALTER_USER_ADD_ALL_DB
)
{
if
(
strcmp
(
alterReq
.
db
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
db
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
db
name
);
if
(
strcmp
(
alterReq
.
obj
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
obj
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
obj
name
);
if
(
pDb
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
goto
_OVER
;
}
if
(
taosHashPut
(
newUser
.
readDbs
,
alterReq
.
dbname
,
len
,
alterReq
.
db
name
,
TSDB_DB_FNAME_LEN
)
!=
0
)
{
if
(
taosHashPut
(
newUser
.
readDbs
,
alterReq
.
objname
,
len
,
alterReq
.
obj
name
,
TSDB_DB_FNAME_LEN
)
!=
0
)
{
mndReleaseDb
(
pMnode
,
pDb
);
goto
_OVER
;
}
...
...
@@ -531,14 +531,14 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
}
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_ADD_WRITE_DB
||
alterReq
.
alterType
==
TSDB_ALTER_USER_ADD_ALL_DB
)
{
if
(
strcmp
(
alterReq
.
db
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
db
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
db
name
);
if
(
strcmp
(
alterReq
.
obj
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
obj
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
obj
name
);
if
(
pDb
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
goto
_OVER
;
}
if
(
taosHashPut
(
newUser
.
writeDbs
,
alterReq
.
dbname
,
len
,
alterReq
.
db
name
,
TSDB_DB_FNAME_LEN
)
!=
0
)
{
if
(
taosHashPut
(
newUser
.
writeDbs
,
alterReq
.
objname
,
len
,
alterReq
.
obj
name
,
TSDB_DB_FNAME_LEN
)
!=
0
)
{
mndReleaseDb
(
pMnode
,
pDb
);
goto
_OVER
;
}
...
...
@@ -555,28 +555,28 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
}
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_REMOVE_READ_DB
||
alterReq
.
alterType
==
TSDB_ALTER_USER_REMOVE_ALL_DB
)
{
if
(
strcmp
(
alterReq
.
db
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
db
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
db
name
);
if
(
strcmp
(
alterReq
.
obj
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
obj
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
obj
name
);
if
(
pDb
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
goto
_OVER
;
}
taosHashRemove
(
newUser
.
readDbs
,
alterReq
.
db
name
,
len
);
taosHashRemove
(
newUser
.
readDbs
,
alterReq
.
obj
name
,
len
);
}
else
{
taosHashClear
(
newUser
.
readDbs
);
}
}
if
(
alterReq
.
alterType
==
TSDB_ALTER_USER_REMOVE_WRITE_DB
||
alterReq
.
alterType
==
TSDB_ALTER_USER_REMOVE_ALL_DB
)
{
if
(
strcmp
(
alterReq
.
db
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
db
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
db
name
);
if
(
strcmp
(
alterReq
.
obj
name
,
"1.*"
)
!=
0
)
{
int32_t
len
=
strlen
(
alterReq
.
obj
name
)
+
1
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
alterReq
.
obj
name
);
if
(
pDb
==
NULL
)
{
mndReleaseDb
(
pMnode
,
pDb
);
goto
_OVER
;
}
taosHashRemove
(
newUser
.
writeDbs
,
alterReq
.
db
name
,
len
);
taosHashRemove
(
newUser
.
writeDbs
,
alterReq
.
obj
name
,
len
);
}
else
{
taosHashClear
(
newUser
.
writeDbs
);
}
...
...
source/libs/executor/inc/executil.h
浏览文件 @
395dedaa
...
...
@@ -161,4 +161,6 @@ int32_t convertFillType(int32_t mode);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
395dedaa
...
...
@@ -538,23 +538,6 @@ typedef struct SStreamIntervalOperatorInfo {
SWinKey
delKey
;
}
SStreamIntervalOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
struct
SFillInfo
*
pFillInfo
;
SSDataBlock
*
pRes
;
SSDataBlock
*
pFinalRes
;
int64_t
totalInputRows
;
void
**
p
;
SSDataBlock
*
existNewGroupBlock
;
STimeWindow
win
;
SColMatchInfo
matchInfo
;
int32_t
primaryTsCol
;
int32_t
primarySrcSlotId
;
uint64_t
curGroupId
;
// current handled group id
SExprInfo
*
pExprInfo
;
int32_t
numOfExpr
;
SExprSupp
noFillExprSupp
;
}
SFillOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
...
...
@@ -806,8 +789,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
qTaskInfo_t
*
pTaskInfo
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
getMaximumIdleDurationSec
();
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
...
...
@@ -825,9 +806,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SStreamState
*
pState
,
STimeWindowAggSupp
*
pTwSup
);
void
appendOneRowToStreamSpecialBlock
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
,
uint64_t
*
pGp
,
void
*
pTbName
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/inc/tfill.h
浏览文件 @
395dedaa
...
...
@@ -25,6 +25,8 @@ extern "C" {
#include "tcommon.h"
#include "tsimplehash.h"
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
struct
SSDataBlock
;
typedef
struct
SFillColInfo
{
...
...
@@ -113,12 +115,12 @@ typedef struct SStreamFillInfo {
int64_t
getNumOfResultsAfterFillGap
(
SFillInfo
*
pFillInfo
,
int64_t
ekey
,
int32_t
maxNumOfRows
);
void
taosFillSetStartInfo
(
struct
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
);
void
taosResetFillInfo
(
struct
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
);
void
taosFillSetInputDataBlock
(
struct
SFillInfo
*
pFillInfo
,
const
struct
SSDataBlock
*
pInput
);
struct
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfFillExpr
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
const
struct
SNodeListNode
*
val
);
bool
taosFillHasMoreResults
(
struct
SFillInfo
*
pFillInfo
);
void
taosFillSetStartInfo
(
struct
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
);
void
taosResetFillInfo
(
struct
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
);
void
taosFillSetInputDataBlock
(
struct
SFillInfo
*
pFillInfo
,
const
struct
SSDataBlock
*
pInput
);
SFillColInfo
*
createFillColInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfFillExpr
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
const
struct
SNodeListNode
*
val
);
bool
taosFillHasMoreResults
(
struct
SFillInfo
*
pFillInfo
);
SFillInfo
*
taosCreateFillInfo
(
TSKEY
skey
,
int32_t
numOfFillCols
,
int32_t
numOfNotFillCols
,
int32_t
capacity
,
SInterval
*
pInterval
,
int32_t
fillType
,
struct
SFillColInfo
*
pCol
,
int32_t
slotId
,
...
...
@@ -128,6 +130,8 @@ void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
int64_t
taosFillResultDataBlock
(
struct
SFillInfo
*
pFillInfo
,
SSDataBlock
*
p
,
int32_t
capacity
);
int64_t
getFillInfoStart
(
struct
SFillInfo
*
pFillInfo
);
bool
fillIfWindowPseudoColumn
(
SFillInfo
*
pFillInfo
,
SFillColInfo
*
pCol
,
SColumnInfoData
*
pDstColInfoData
,
int32_t
rowIndex
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
395dedaa
...
...
@@ -15,26 +15,15 @@
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tref.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tsort.h"
#include "ttime.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
typedef
struct
SFetchRspHandleWrapper
{
uint32_t
exchangeId
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
395dedaa
...
...
@@ -2002,3 +2002,13 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return
TSDB_CODE_SUCCESS
;
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"===stream===printDataBlock: Block is Null or Empty"
);
return
;
}
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
taosMemoryFree
(
pBuf
);
}
\ No newline at end of file
source/libs/executor/src/executor.c
浏览文件 @
395dedaa
...
...
@@ -704,6 +704,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
return
TSDB_CODE_SUCCESS
;
}
static
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
if
(
pTaskInfo
==
NULL
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
395dedaa
...
...
@@ -91,7 +91,6 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
static
void
releaseQueryBuf
(
size_t
numOfTables
);
static
void
destroyFillOperatorInfo
(
void
*
param
);
static
void
destroyAggOperatorInfo
(
void
*
param
);
static
void
initCtxOutputBuffer
(
SqlFunctionCtx
*
pCtx
,
int32_t
size
);
static
void
doSetTableGroupOutputBuf
(
SOperatorInfo
*
pOperator
,
int32_t
numOfOutput
,
uint64_t
groupId
);
...
...
@@ -1157,20 +1156,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
}
}
void
printTaskExecCostInLog
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
"load block SMA:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pSummary
->
extractListTime
,
pSummary
->
groupIdMapTime
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
}
}
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
...
...
@@ -1603,179 +1588,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return
(
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
static
void
doHandleRemainBlockForNewGroupImpl
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
pInfo
->
totalInputRows
=
pInfo
->
existNewGroupBlock
->
info
.
rows
;
SSDataBlock
*
pResBlock
=
pInfo
->
pFinalRes
;
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
int64_t
ekey
=
pInfo
->
existNewGroupBlock
->
info
.
window
.
ekey
;
taosResetFillInfo
(
pInfo
->
pFillInfo
,
getFillInfoStart
(
pInfo
->
pFillInfo
));
blockDataCleanup
(
pInfo
->
pRes
);
doApplyScalarCalculation
(
pOperator
,
pInfo
->
existNewGroupBlock
,
order
,
scanFlag
);
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
ekey
);
taosFillSetInputDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
);
int32_t
numOfResultRows
=
pResultInfo
->
capacity
-
pResBlock
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pResBlock
,
numOfResultRows
);
pInfo
->
curGroupId
=
pInfo
->
existNewGroupBlock
->
info
.
id
.
groupId
;
pInfo
->
existNewGroupBlock
=
NULL
;
}
static
void
doHandleRemainBlockFromNewGroup
(
SOperatorInfo
*
pOperator
,
SFillOperatorInfo
*
pInfo
,
SResultInfo
*
pResultInfo
,
SExecTaskInfo
*
pTaskInfo
)
{
if
(
taosFillHasMoreResults
(
pInfo
->
pFillInfo
))
{
int32_t
numOfResultRows
=
pResultInfo
->
capacity
-
pInfo
->
pFinalRes
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pFinalRes
,
numOfResultRows
);
pInfo
->
pRes
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
;
}
// handle the cached new group data block
if
(
pInfo
->
existNewGroupBlock
)
{
doHandleRemainBlockForNewGroupImpl
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
}
}
static
void
doApplyScalarCalculation
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
setInputDataBlock
(
pSup
,
pBlock
,
order
,
scanFlag
,
false
);
projectApplyFunctions
(
pSup
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pSup
->
pCtx
,
pSup
->
numOfExprs
,
NULL
);
// reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
pInfo
->
pRes
->
info
.
rows
=
0
;
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
setInputDataBlock
(
pNoFillSupp
,
pBlock
,
order
,
scanFlag
,
false
);
projectApplyFunctions
(
pNoFillSupp
->
pExprInfo
,
pInfo
->
pRes
,
pBlock
,
pNoFillSupp
->
pCtx
,
pNoFillSupp
->
numOfExprs
,
NULL
);
pInfo
->
pRes
->
info
.
id
.
groupId
=
pBlock
->
info
.
id
.
groupId
;
}
static
SSDataBlock
*
doFillImpl
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
SSDataBlock
*
pResBlock
=
pInfo
->
pFinalRes
;
blockDataCleanup
(
pResBlock
);
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
);
doHandleRemainBlockFromNewGroup
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>
0
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
SOperatorInfo
*
pDownstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
SSDataBlock
*
pBlock
=
pDownstream
->
fpSet
.
getNextFn
(
pDownstream
);
if
(
pBlock
==
NULL
)
{
if
(
pInfo
->
totalInputRows
==
0
)
{
setOperatorCompleted
(
pOperator
);
return
NULL
;
}
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
pInfo
->
win
.
ekey
);
}
else
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primarySrcSlotId
);
blockDataCleanup
(
pInfo
->
pRes
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pBlock
->
info
.
rows
);
doApplyScalarCalculation
(
pOperator
,
pBlock
,
order
,
scanFlag
);
if
(
pInfo
->
curGroupId
==
0
||
pInfo
->
curGroupId
==
pInfo
->
pRes
->
info
.
id
.
groupId
)
{
pInfo
->
curGroupId
=
pInfo
->
pRes
->
info
.
id
.
groupId
;
// the first data block
pInfo
->
totalInputRows
+=
pInfo
->
pRes
->
info
.
rows
;
if
(
order
==
pInfo
->
pFillInfo
->
order
)
{
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
pBlock
->
info
.
window
.
ekey
);
}
else
{
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
->
info
.
rows
,
pBlock
->
info
.
window
.
skey
);
}
taosFillSetInputDataBlock
(
pInfo
->
pFillInfo
,
pInfo
->
pRes
);
}
else
if
(
pInfo
->
curGroupId
!=
pBlock
->
info
.
id
.
groupId
)
{
// the new group data block
pInfo
->
existNewGroupBlock
=
pBlock
;
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
pInfo
->
win
.
ekey
);
}
}
int32_t
numOfResultRows
=
pOperator
->
resultInfo
.
capacity
-
pResBlock
->
info
.
rows
;
taosFillResultDataBlock
(
pInfo
->
pFillInfo
,
pResBlock
,
numOfResultRows
);
// current group has no more result to return
if
(
pResBlock
->
info
.
rows
>
0
)
{
// 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if
(
pResBlock
->
info
.
rows
>
pResultInfo
->
threshold
||
pBlock
==
NULL
||
pInfo
->
existNewGroupBlock
!=
NULL
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
doHandleRemainBlockFromNewGroup
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
||
pBlock
==
NULL
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
}
else
if
(
pInfo
->
existNewGroupBlock
)
{
// try next group
assert
(
pBlock
!=
NULL
);
blockDataCleanup
(
pResBlock
);
doHandleRemainBlockForNewGroupImpl
(
pOperator
,
pInfo
,
pResultInfo
,
pTaskInfo
);
if
(
pResBlock
->
info
.
rows
>
pResultInfo
->
threshold
)
{
pResBlock
->
info
.
id
.
groupId
=
pInfo
->
curGroupId
;
return
pResBlock
;
}
}
else
{
return
NULL
;
}
}
}
static
SSDataBlock
*
doFill
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SSDataBlock
*
fillResult
=
NULL
;
while
(
true
)
{
fillResult
=
doFillImpl
(
pOperator
);
if
(
fillResult
==
NULL
)
{
setOperatorCompleted
(
pOperator
);
break
;
}
doFilter
(
fillResult
,
pOperator
->
exprSupp
.
pFilterInfo
,
&
pInfo
->
matchInfo
);
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
}
if
(
fillResult
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
fillResult
->
info
.
rows
;
}
return
fillResult
;
}
void
destroyExprInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfExprs
)
{
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pExpr
[
i
];
...
...
@@ -2045,167 +1857,6 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear
(
param
);
}
void
destroyFillOperatorInfo
(
void
*
param
)
{
SFillOperatorInfo
*
pInfo
=
(
SFillOperatorInfo
*
)
param
;
pInfo
->
pFillInfo
=
taosDestroyFillInfo
(
pInfo
->
pFillInfo
);
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
pInfo
->
pFinalRes
=
blockDataDestroy
(
pInfo
->
pFinalRes
);
cleanupExprSupp
(
&
pInfo
->
noFillExprSupp
);
taosMemoryFreeClear
(
pInfo
->
p
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
static
int32_t
initFillInfo
(
SFillOperatorInfo
*
pInfo
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SExprInfo
*
pNotFillExpr
,
int32_t
numOfNotFillCols
,
SNodeListNode
*
pValNode
,
STimeWindow
win
,
int32_t
capacity
,
const
char
*
id
,
SInterval
*
pInterval
,
int32_t
fillType
,
int32_t
order
)
{
SFillColInfo
*
pColInfo
=
createFillColInfo
(
pExpr
,
numOfCols
,
pNotFillExpr
,
numOfNotFillCols
,
pValNode
);
int64_t
startKey
=
(
order
==
TSDB_ORDER_ASC
)
?
win
.
skey
:
win
.
ekey
;
STimeWindow
w
=
getAlignQueryTimeWindow
(
pInterval
,
pInterval
->
precision
,
startKey
);
w
=
getFirstQualifiedTimeWindow
(
startKey
,
&
w
,
pInterval
,
order
);
pInfo
->
pFillInfo
=
taosCreateFillInfo
(
w
.
skey
,
numOfCols
,
numOfNotFillCols
,
capacity
,
pInterval
,
fillType
,
pColInfo
,
pInfo
->
primaryTsCol
,
order
,
id
);
if
(
order
==
TSDB_ORDER_ASC
)
{
pInfo
->
win
.
skey
=
win
.
skey
;
pInfo
->
win
.
ekey
=
win
.
ekey
;
}
else
{
pInfo
->
win
.
skey
=
win
.
ekey
;
pInfo
->
win
.
ekey
=
win
.
skey
;
}
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
if
(
pInfo
->
pFillInfo
==
NULL
||
pInfo
->
p
==
NULL
)
{
taosMemoryFree
(
pInfo
->
pFillInfo
);
taosMemoryFree
(
pInfo
->
p
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
return
TSDB_CODE_SUCCESS
;
}
}
static
bool
isWstartColumnExist
(
SFillOperatorInfo
*
pInfo
)
{
if
(
pInfo
->
noFillExprSupp
.
numOfExprs
==
0
)
{
return
false
;
}
for
(
int32_t
i
=
0
;
i
<
pInfo
->
noFillExprSupp
.
numOfExprs
;
++
i
)
{
SExprInfo
*
exprInfo
=
pInfo
->
noFillExprSupp
.
pExprInfo
+
i
;
if
(
exprInfo
->
pExpr
->
nodeType
==
QUERY_NODE_COLUMN
&&
exprInfo
->
base
.
numOfParams
==
1
&&
exprInfo
->
base
.
pParam
[
0
].
pCol
->
colType
==
COLUMN_TYPE_WINDOW_START
)
{
return
true
;
}
}
return
false
;
}
static
int32_t
createPrimaryTsExprIfNeeded
(
SFillOperatorInfo
*
pInfo
,
SFillPhysiNode
*
pPhyFillNode
,
SExprSupp
*
pExprSupp
,
const
char
*
idStr
)
{
bool
wstartExist
=
isWstartColumnExist
(
pInfo
);
if
(
wstartExist
==
false
)
{
if
(
pPhyFillNode
->
pWStartTs
->
type
!=
QUERY_NODE_TARGET
)
{
qError
(
"pWStartTs of fill physical node is not a target node, %s"
,
idStr
);
return
TSDB_CODE_QRY_SYS_ERROR
;
}
SExprInfo
*
pExpr
=
taosMemoryRealloc
(
pExprSupp
->
pExprInfo
,
(
pExprSupp
->
numOfExprs
+
1
)
*
sizeof
(
SExprInfo
));
if
(
pExpr
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
createExprFromTargetNode
(
&
pExpr
[
pExprSupp
->
numOfExprs
],
(
STargetNode
*
)
pPhyFillNode
->
pWStartTs
);
pExprSupp
->
numOfExprs
+=
1
;
pExprSupp
->
pExprInfo
=
pExpr
;
}
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SFillOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SFillOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pRes
=
createDataBlockFromDescNode
(
pPhyFillNode
->
node
.
pOutputDataBlockDesc
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pFillExprs
,
NULL
,
&
pInfo
->
numOfExpr
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
SExprSupp
*
pNoFillSupp
=
&
pInfo
->
noFillExprSupp
;
pNoFillSupp
->
pExprInfo
=
createExprInfo
(
pPhyFillNode
->
pNotFillExprs
,
NULL
,
&
pNoFillSupp
->
numOfExprs
);
int32_t
code
=
createPrimaryTsExprIfNeeded
(
pInfo
,
pPhyFillNode
,
pNoFillSupp
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
code
=
initExprSupp
(
pNoFillSupp
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
SInterval
*
pInterval
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
downstream
->
operatorType
?
&
((
SMergeAlignedIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
intervalAggOperatorInfo
->
interval
:
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
order
=
(
pPhyFillNode
->
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
pInfo
->
numOfExpr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
primaryTsCol
=
((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
slotId
;
pInfo
->
primarySrcSlotId
=
((
SColumnNode
*
)((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
pExpr
)
->
slotId
;
int32_t
numOfOutputCols
=
0
;
code
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
pInfo
->
numOfExpr
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
,
(
SNodeListNode
*
)
pPhyFillNode
->
pValues
,
pPhyFillNode
->
timeRange
,
pResultInfo
->
capacity
,
pTaskInfo
->
id
.
str
,
pInterval
,
type
,
order
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
pFinalRes
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pOperator
->
resultInfo
.
capacity
);
code
=
filterInitFromNode
((
SNode
*
)
pPhyFillNode
->
node
.
pConditions
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
setOperatorInfo
(
pOperator
,
"FillOperator"
,
QUERY_NODE_PHYSICAL_PLAN_FILL
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
pInfo
->
numOfExpr
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doFill
,
NULL
,
destroyFillOperatorInfo
,
NULL
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
return
pOperator
;
_error:
if
(
pInfo
!=
NULL
)
{
destroyFillOperatorInfo
(
pInfo
);
}
pTaskInfo
->
code
=
code
;
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
static
SExecTaskInfo
*
createExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
)
{
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
if
(
pTaskInfo
==
NULL
)
{
...
...
source/libs/executor/src/filloperator.c
0 → 100644
浏览文件 @
395dedaa
此差异已折叠。
点击以展开。
source/libs/executor/src/scanoperator.c
浏览文件 @
395dedaa
...
...
@@ -1339,30 +1339,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return
code
;
}
static
void
calBlockTag
(
SExprSupp
*
pTagCalSup
,
SSDataBlock
*
pBlock
,
SSDataBlock
*
pResBlock
)
{
if
(
pTagCalSup
==
NULL
||
pTagCalSup
->
numOfExprs
==
0
)
return
;
if
(
pBlock
==
NULL
||
pBlock
->
info
.
rows
==
0
)
return
;
SSDataBlock
*
pSrcBlock
=
blockCopyOneRow
(
pBlock
,
0
);
ASSERT
(
pSrcBlock
->
info
.
rows
==
1
);
blockDataEnsureCapacity
(
pResBlock
,
1
);
projectApplyFunctions
(
pTagCalSup
->
pExprInfo
,
pResBlock
,
pSrcBlock
,
pTagCalSup
->
pCtx
,
1
,
NULL
);
ASSERT
(
pResBlock
->
info
.
rows
==
1
);
// build tagArray
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
/*STagVal tagVal = {*/
/*.cid = 0,*/
/*.type = 0,*/
/*};*/
// build STag
// set STag
blockDataDestroy
(
pSrcBlock
);
}
void
calBlockTbName
(
SStreamScanInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
SExprSupp
*
pTbNameCalSup
=
&
pInfo
->
tbnameCalSup
;
SStreamState
*
pState
=
pInfo
->
pStreamScanOp
->
pTaskInfo
->
streamInfo
.
pState
;
...
...
source/libs/executor/src/tfill.c
浏览文件 @
395dedaa
此差异已折叠。
点击以展开。
source/libs/executor/src/timesliceoperator.c
浏览文件 @
395dedaa
...
...
@@ -467,11 +467,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// add current row if timestamp match
if
(
ts
==
pSliceInfo
->
current
&&
pSliceInfo
->
current
<=
pSliceInfo
->
win
.
ekey
)
{
addCurrentRowToResult
(
pSliceInfo
,
&
pOperator
->
exprSupp
,
pResBlock
,
pBlock
,
i
);
doKeepPrevRows
(
pSliceInfo
,
pBlock
,
i
);
pSliceInfo
->
current
=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
}
doKeepPrevRows
(
pSliceInfo
,
pBlock
,
i
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
setOperatorCompleted
(
pOperator
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
395dedaa
...
...
@@ -677,16 +677,6 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
}
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
if
(
!
pBlock
||
pBlock
->
info
.
rows
==
0
)
{
qDebug
(
"===stream===printDataBlock: Block is Null or Empty"
);
return
;
}
char
*
pBuf
=
NULL
;
qDebug
(
"%s"
,
dumpBlockData
(
pBlock
,
flag
,
&
pBuf
));
taosMemoryFree
(
pBuf
);
}
typedef
int32_t
(
*
__compare_fn_t
)(
void
*
pKey
,
void
*
data
,
int32_t
index
);
int32_t
binarySearchCom
(
void
*
keyList
,
int
num
,
void
*
pKey
,
int
order
,
__compare_fn_t
comparefn
)
{
...
...
@@ -3854,7 +3844,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamStateAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
int32_t
numOfOutput
=
pOperator
->
exprSupp
.
numOfExprs
;
int64_t
groupId
=
pSDataBlock
->
info
.
id
.
groupId
;
uint64_t
groupId
=
pSDataBlock
->
info
.
id
.
groupId
;
int64_t
code
=
TSDB_CODE_SUCCESS
;
TSKEY
*
tsCols
=
NULL
;
SResultRow
*
pResult
=
NULL
;
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
395dedaa
...
...
@@ -231,8 +231,10 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SIF_ERR_RET
(
sifGetValueFromNode
(
node
,
&
param
->
condValue
));
param
->
colId
=
-
1
;
param
->
colValType
=
(
uint8_t
)(
vn
->
node
.
resType
.
type
);
if
(
strlen
(
vn
->
literal
)
<=
sizeof
(
param
->
colName
))
{
if
(
vn
->
literal
!=
NULL
&&
strlen
(
vn
->
literal
)
<=
sizeof
(
param
->
colName
))
{
memcpy
(
param
->
colName
,
vn
->
literal
,
strlen
(
vn
->
literal
));
}
else
{
param
->
status
=
SFLT_NOT_INDEX
;
}
break
;
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
395dedaa
...
...
@@ -424,6 +424,7 @@ SNode* nodesMakeNode(ENodeType type) {
case
QUERY_NODE_SHOW_TRANSACTIONS_STMT
:
case
QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT
:
case
QUERY_NODE_SHOW_TAGS_STMT
:
case
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT
:
return
makeNode
(
type
,
sizeof
(
SShowStmt
));
case
QUERY_NODE_SHOW_TABLE_TAGS_STMT
:
return
makeNode
(
type
,
sizeof
(
SShowTableTagsStmt
));
...
...
@@ -943,7 +944,8 @@ void nodesDestroyNode(SNode* pNode) {
case
QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT
:
case
QUERY_NODE_SHOW_TRANSACTIONS_STMT
:
case
QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT
:
case
QUERY_NODE_SHOW_TAGS_STMT
:
{
case
QUERY_NODE_SHOW_TAGS_STMT
:
case
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT
:
{
SShowStmt
*
pStmt
=
(
SShowStmt
*
)
pNode
;
nodesDestroyNode
(
pStmt
->
pDbName
);
nodesDestroyNode
(
pStmt
->
pTbName
);
...
...
source/libs/parser/inc/sql.y
浏览文件 @
395dedaa
...
...
@@ -101,6 +101,7 @@ cmd ::= REVOKE privileges(A) ON priv_level(B) FROM user_name(C).
%destructor privileges { }
privileges(A) ::= ALL. { A = PRIVILEGE_TYPE_ALL; }
privileges(A) ::= priv_type_list(B). { A = B; }
privileges(A) ::= SUBSCRIBE. { A = PRIVILEGE_TYPE_SUBSCRIBE; }
%type priv_type_list { int64_t }
%destructor priv_type_list { }
...
...
@@ -116,6 +117,7 @@ priv_type(A) ::= WRITE.
%destructor priv_level { }
priv_level(A) ::= NK_STAR(B) NK_DOT NK_STAR. { A = B; }
priv_level(A) ::= db_name(B) NK_DOT NK_STAR. { A = B; }
priv_level(A) ::= topic_name(B). { A = B; }
/************************************************ create/drop/alter dnode *********************************************/
cmd ::= CREATE DNODE dnode_endpoint(A). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, NULL); }
...
...
@@ -393,6 +395,7 @@ col_name(A) ::= column_name(B).
/************************************************ show ****************************************************************/
cmd ::= SHOW DNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DNODES_STMT); }
cmd ::= SHOW USERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_USERS_STMT); }
cmd ::= SHOW USER PRIVILEGES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_USER_PRIVILEGES_STMT); }
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASES_STMT); }
cmd ::= SHOW db_name_cond_opt(A) TABLES like_pattern_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TABLES_STMT, A, B, OP_TYPE_LIKE); }
cmd ::= SHOW db_name_cond_opt(A) STABLES like_pattern_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_STABLES_STMT, A, B, OP_TYPE_LIKE); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
395dedaa
...
...
@@ -1815,7 +1815,7 @@ SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbN
SGrantStmt
*
pStmt
=
(
SGrantStmt
*
)
nodesMakeNode
(
QUERY_NODE_GRANT_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
pStmt
->
privileges
=
privileges
;
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
db
Name
,
pDbName
);
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
obj
Name
,
pDbName
);
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
userName
,
pUserName
);
return
(
SNode
*
)
pStmt
;
}
...
...
@@ -1828,7 +1828,7 @@ SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDb
SRevokeStmt
*
pStmt
=
(
SRevokeStmt
*
)
nodesMakeNode
(
QUERY_NODE_REVOKE_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
pStmt
->
privileges
=
privileges
;
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
db
Name
,
pDbName
);
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
obj
Name
,
pDbName
);
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
userName
,
pUserName
);
return
(
SNode
*
)
pStmt
;
}
...
...
source/libs/parser/src/parAstParser.c
浏览文件 @
395dedaa
...
...
@@ -504,6 +504,11 @@ static int32_t collectMetaKeyFromShowVnodes(SCollectMetaKeyCxt* pCxt, SShowVnode
pCxt
->
pMetaCache
);
}
static
int32_t
collectMetaKeyFromShowUserPrivileges
(
SCollectMetaKeyCxt
*
pCxt
,
SShowStmt
*
pStmt
)
{
return
reserveTableMetaInCache
(
pCxt
->
pParseCxt
->
acctId
,
TSDB_INFORMATION_SCHEMA_DB
,
TSDB_INS_TABLE_USER_PRIVILEGES
,
pCxt
->
pMetaCache
);
}
static
int32_t
collectMetaKeyFromShowCreateDatabase
(
SCollectMetaKeyCxt
*
pCxt
,
SShowCreateDatabaseStmt
*
pStmt
)
{
return
reserveDbCfgInCache
(
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
dbName
,
pCxt
->
pMetaCache
);
}
...
...
@@ -648,6 +653,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return
collectMetaKeyFromShowDnodeVariables
(
pCxt
,
(
SShowDnodeVariablesStmt
*
)
pStmt
);
case
QUERY_NODE_SHOW_VNODES_STMT
:
return
collectMetaKeyFromShowVnodes
(
pCxt
,
(
SShowVnodesStmt
*
)
pStmt
);
case
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT
:
return
collectMetaKeyFromShowUserPrivileges
(
pCxt
,
(
SShowStmt
*
)
pStmt
);
case
QUERY_NODE_SHOW_CREATE_DATABASE_STMT
:
return
collectMetaKeyFromShowCreateDatabase
(
pCxt
,
(
SShowCreateDatabaseStmt
*
)
pStmt
);
case
QUERY_NODE_SHOW_CREATE_TABLE_STMT
:
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
395dedaa
...
...
@@ -161,6 +161,7 @@ static SKeyword keywordTable[] = {
{
"PPS"
,
TK_PPS
},
{
"PRECISION"
,
TK_PRECISION
},
{
"PREV"
,
TK_PREV
},
{
"PRIVILEGES"
,
TK_PRIVILEGES
},
{
"QNODE"
,
TK_QNODE
},
{
"QNODES"
,
TK_QNODES
},
{
"QTIME"
,
TK_QTIME
},
...
...
@@ -202,6 +203,7 @@ static SKeyword keywordTable[] = {
{
"STREAM"
,
TK_STREAM
},
{
"STREAMS"
,
TK_STREAMS
},
{
"STRICT"
,
TK_STRICT
},
{
"SUBSCRIBE"
,
TK_SUBSCRIBE
},
{
"SUBSCRIPTIONS"
,
TK_SUBSCRIPTIONS
},
{
"SUBTABLE"
,
TK_SUBTABLE
},
{
"SYSINFO"
,
TK_SYSINFO
},
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
395dedaa
...
...
@@ -251,6 +251,12 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
.
numOfShowCols
=
1
,
.
pShowCols
=
{
"*"
}
},
{
.
showType
=
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT
,
.
pDbName
=
TSDB_INFORMATION_SCHEMA_DB
,
.
pTableName
=
TSDB_INS_TABLE_USER_PRIVILEGES
,
.
numOfShowCols
=
1
,
.
pShowCols
=
{
"*"
}
},
};
// clang-format on
...
...
@@ -5031,7 +5037,7 @@ static int32_t translateAlterUser(STranslateContext* pCxt, SAlterUserStmt* pStmt
alterReq
.
sysInfo
=
pStmt
->
sysinfo
;
snprintf
(
alterReq
.
pass
,
sizeof
(
alterReq
.
pass
),
"%s"
,
pStmt
->
password
);
if
(
NULL
!=
pCxt
->
pParseCxt
->
db
)
{
snprintf
(
alterReq
.
dbname
,
sizeof
(
alterReq
.
db
name
),
"%s"
,
pCxt
->
pParseCxt
->
db
);
snprintf
(
alterReq
.
objname
,
sizeof
(
alterReq
.
obj
name
),
"%s"
,
pCxt
->
pParseCxt
->
db
);
}
return
buildCmdMsg
(
pCxt
,
TDMT_MND_ALTER_USER
,
(
FSerializeFunc
)
tSerializeSAlterUserReq
,
&
alterReq
);
...
...
@@ -5710,9 +5716,11 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
req
.
alterType
=
TSDB_ALTER_USER_ADD_READ_DB
;
}
else
if
(
PRIVILEGE_TYPE_TEST_MASK
(
pStmt
->
privileges
,
PRIVILEGE_TYPE_WRITE
))
{
req
.
alterType
=
TSDB_ALTER_USER_ADD_WRITE_DB
;
}
else
if
(
PRIVILEGE_TYPE_TEST_MASK
(
pStmt
->
privileges
,
PRIVILEGE_TYPE_SUBSCRIBE
))
{
req
.
alterType
=
TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC
;
}
strcpy
(
req
.
user
,
pStmt
->
userName
);
sprintf
(
req
.
dbname
,
"%d.%s"
,
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
db
Name
);
sprintf
(
req
.
objname
,
"%d.%s"
,
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
obj
Name
);
return
buildCmdMsg
(
pCxt
,
TDMT_MND_ALTER_USER
,
(
FSerializeFunc
)
tSerializeSAlterUserReq
,
&
req
);
}
...
...
@@ -5726,9 +5734,11 @@ static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
req
.
alterType
=
TSDB_ALTER_USER_REMOVE_READ_DB
;
}
else
if
(
PRIVILEGE_TYPE_TEST_MASK
(
pStmt
->
privileges
,
PRIVILEGE_TYPE_WRITE
))
{
req
.
alterType
=
TSDB_ALTER_USER_REMOVE_WRITE_DB
;
}
else
if
(
PRIVILEGE_TYPE_TEST_MASK
(
pStmt
->
privileges
,
PRIVILEGE_TYPE_SUBSCRIBE
))
{
req
.
alterType
=
TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC
;
}
strcpy
(
req
.
user
,
pStmt
->
userName
);
sprintf
(
req
.
dbname
,
"%d.%s"
,
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
db
Name
);
sprintf
(
req
.
objname
,
"%d.%s"
,
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
obj
Name
);
return
buildCmdMsg
(
pCxt
,
TDMT_MND_ALTER_USER
,
(
FSerializeFunc
)
tSerializeSAlterUserReq
,
&
req
);
}
...
...
@@ -7504,6 +7514,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case
QUERY_NODE_SHOW_CONSUMERS_STMT
:
case
QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT
:
case
QUERY_NODE_SHOW_TAGS_STMT
:
case
QUERY_NODE_SHOW_USER_PRIVILEGES_STMT
:
code
=
rewriteShow
(
pCxt
,
pQuery
);
break
;
case
QUERY_NODE_SHOW_VGROUPS_STMT
:
...
...
source/libs/parser/src/sql.c
浏览文件 @
395dedaa
此差异已折叠。
点击以展开。
source/libs/parser/test/mockCatalog.cpp
浏览文件 @
395dedaa
...
...
@@ -101,6 +101,10 @@ void generateInformationSchema(MockCatalogService* mcs) {
.
addColumn
(
"table_name"
,
TSDB_DATA_TYPE_BINARY
,
TSDB_TABLE_NAME_LEN
)
.
addColumn
(
"db_name"
,
TSDB_DATA_TYPE_BINARY
,
TSDB_DB_NAME_LEN
)
.
done
();
mcs
->
createTableBuilder
(
TSDB_INFORMATION_SCHEMA_DB
,
TSDB_INS_TABLE_USER_PRIVILEGES
,
TSDB_SYSTEM_TABLE
,
2
)
.
addColumn
(
"user_name"
,
TSDB_DATA_TYPE_BINARY
,
TSDB_USER_LEN
)
.
addColumn
(
"privilege"
,
TSDB_DATA_TYPE_BINARY
,
10
)
.
done
();
}
void
generatePerformanceSchema
(
MockCatalogService
*
mcs
)
{
...
...
@@ -248,8 +252,8 @@ int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, con
return
g_mockCatalogService
->
catalogGetTableDistVgInfo
(
pTableName
,
pVgList
);
}
int32_t
__catalogGetDBVgVersion
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int32_t
*
version
,
int64_t
*
dbId
,
int
32_t
*
tableNum
,
int
64_t
*
stateTs
)
{
int32_t
__catalogGetDBVgVersion
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
int32_t
*
version
,
int64_t
*
dbId
,
int32_t
*
tableNum
,
int64_t
*
stateTs
)
{
return
0
;
}
...
...
source/libs/parser/test/parAlterToBalanceTest.cpp
浏览文件 @
395dedaa
...
...
@@ -613,7 +613,7 @@ TEST_F(ParserInitialATest, alterUser) {
if
(
nullptr
!=
pPass
)
{
strcpy
(
expect
.
pass
,
pPass
);
}
strcpy
(
expect
.
db
name
,
"test"
);
strcpy
(
expect
.
obj
name
,
"test"
);
};
setCheckDdlFunc
([
&
](
const
SQuery
*
pQuery
,
ParserStage
stage
)
{
...
...
@@ -627,7 +627,7 @@ TEST_F(ParserInitialATest, alterUser) {
ASSERT_EQ
(
req
.
enable
,
expect
.
enable
);
ASSERT_EQ
(
std
::
string
(
req
.
user
),
std
::
string
(
expect
.
user
));
ASSERT_EQ
(
std
::
string
(
req
.
pass
),
std
::
string
(
expect
.
pass
));
ASSERT_EQ
(
std
::
string
(
req
.
dbname
),
std
::
string
(
expect
.
db
name
));
ASSERT_EQ
(
std
::
string
(
req
.
objname
),
std
::
string
(
expect
.
obj
name
));
});
setAlterUserReq
(
"wxy"
,
TSDB_ALTER_USER_PASSWD
,
"123456"
);
...
...
source/libs/parser/test/parExplainToSyncdbTest.cpp
浏览文件 @
395dedaa
...
...
@@ -34,10 +34,38 @@ TEST_F(ParserExplainToSyncdbTest, explain) {
TEST_F
(
ParserExplainToSyncdbTest
,
grant
)
{
useDb
(
"root"
,
"test"
);
SAlterUserReq
expect
=
{
0
};
auto
setAlterUserReq
=
[
&
](
int8_t
alterType
,
const
string
&
user
,
const
string
&
obj
)
{
expect
.
alterType
=
alterType
;
snprintf
(
expect
.
user
,
sizeof
(
expect
.
user
),
"%s"
,
user
.
c_str
());
snprintf
(
expect
.
objname
,
sizeof
(
expect
.
objname
),
"%s"
,
obj
.
c_str
());
};
setCheckDdlFunc
([
&
](
const
SQuery
*
pQuery
,
ParserStage
stage
)
{
ASSERT_EQ
(
nodeType
(
pQuery
->
pRoot
),
QUERY_NODE_GRANT_STMT
);
ASSERT_EQ
(
pQuery
->
pCmdMsg
->
msgType
,
TDMT_MND_ALTER_USER
);
SAlterUserReq
req
=
{
0
};
ASSERT_EQ
(
tDeserializeSAlterUserReq
(
pQuery
->
pCmdMsg
->
pMsg
,
pQuery
->
pCmdMsg
->
msgLen
,
&
req
),
TSDB_CODE_SUCCESS
);
ASSERT_EQ
(
req
.
alterType
,
expect
.
alterType
);
ASSERT_EQ
(
string
(
req
.
user
),
string
(
expect
.
user
));
ASSERT_EQ
(
string
(
req
.
objname
),
string
(
expect
.
objname
));
});
setAlterUserReq
(
TSDB_ALTER_USER_ADD_ALL_DB
,
"wxy"
,
"0.test"
);
run
(
"GRANT ALL ON test.* TO wxy"
);
setAlterUserReq
(
TSDB_ALTER_USER_ADD_READ_DB
,
"wxy"
,
"0.test"
);
run
(
"GRANT READ ON test.* TO wxy"
);
setAlterUserReq
(
TSDB_ALTER_USER_ADD_WRITE_DB
,
"wxy"
,
"0.test"
);
run
(
"GRANT WRITE ON test.* TO wxy"
);
setAlterUserReq
(
TSDB_ALTER_USER_ADD_ALL_DB
,
"wxy"
,
"0.test"
);
run
(
"GRANT READ, WRITE ON test.* TO wxy"
);
setAlterUserReq
(
TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC
,
"wxy"
,
"0.tp1"
);
run
(
"GRANT SUBSCRIBE ON tp1 TO wxy"
);
}
TEST_F
(
ParserExplainToSyncdbTest
,
insert
)
{
...
...
source/libs/parser/test/parShowToUse.cpp
浏览文件 @
395dedaa
...
...
@@ -213,7 +213,13 @@ TEST_F(ParserShowToUseTest, showTags) {
TEST_F
(
ParserShowToUseTest
,
showUsers
)
{
useDb
(
"root"
,
"test"
);
run
(
"SHOW users"
);
run
(
"SHOW USERS"
);
}
TEST_F
(
ParserShowToUseTest
,
showUserPrivileges
)
{
useDb
(
"root"
,
"test"
);
run
(
"SHOW USER PRIVILEGES"
);
}
TEST_F
(
ParserShowToUseTest
,
showVariables
)
{
...
...
tests/parallel_test/cases.task
浏览文件 @
395dedaa
...
...
@@ -418,7 +418,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
#
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
...
...
tests/system-test/0-others/compatibility.py
浏览文件 @
395dedaa
...
...
@@ -4,7 +4,7 @@ import sys
import
os
import
time
from
pathlib
import
Path
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
...
...
@@ -13,161 +13,167 @@ from util.dnodes import TDDnodes
from
util.dnodes
import
TDDnode
from
util.cluster
import
*
BASEVERSION
=
"3.0.1.8"
class
TDTestCase
:
def
caseDescription
(
self
):
'''
3.0 data compatibility test
case1: basedata version is 3.0.1.0
'''
return
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
or
"taosd.exe"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
getCfgPath
(
self
):
buildPath
=
self
.
getBuildPath
()
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
else
:
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
return
cfgPath
def
installTaosd
(
self
,
bPath
,
cPath
):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath
=
"/usr/local/src/"
packageName
=
"TDengine-server-3.0.1.0-Linux-x64.tar.gz"
os
.
system
(
f
"cd
{
packagePath
}
&& tar xvf TDengine-server-3.0.1.0-Linux-x64.tar.gz && cd TDengine-server-3.0.1.0 && ./install.sh -e no "
)
tdDnodes
.
stop
(
1
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
1
)
def
buildTaosd
(
self
,
bPath
):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os
.
system
(
f
" cd
{
bPath
}
&& make install "
)
def
run
(
self
):
print
(
f
"start taosd run"
)
bPath
=
self
.
getBuildPath
()
cPath
=
self
.
getCfgPath
()
dbname
=
"test"
stb
=
f
"
{
dbname
}
.meters"
self
.
installTaosd
(
bPath
,
cPath
)
os
.
system
(
"echo 'debugFlag 143' > /etc/taos/taos.cfg "
)
tableNumbers
=
100
recordNumbers1
=
100
recordNumbers2
=
1000
#tdsqlF=tdCom.newTdSql()
#print(tdsqlF)
oldServerVersion
=
'3.0.1.0'
#tdsqlF.query(f"SELECT SERVER_VERSION();")
#print(tdsqlF.query(f"SELECT SERVER_VERSION();"))
#oldServerVersion=tdsqlF.queryResult[0][0]
#tdLog.info(f"Base server version is {oldServerVersion}")
#tdsqlF.query(f"SELECT CLIENT_VERSION();")
#
## the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
#oldClientVersion=tdsqlF.queryResult[0][0]
#tdLog.info(f"Base client version is {oldClientVersion}")
tdLog
.
printNoPrefix
(
f
"==========step1:prepare and check data in old version-
{
oldServerVersion
}
"
)
tdLog
.
info
(
f
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
os
.
system
(
f
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
sleep
(
3
)
# tdsqlF.query(f"select count(*) from {stb}")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
os
.
system
(
"pkill taosd"
)
sleep
(
2
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
10
)
tdLog
.
info
(
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y "
)
os
.
system
(
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y"
)
os
.
system
(
"pkill -9 taosd"
)
tdLog
.
printNoPrefix
(
"==========step2:update new version "
)
self
.
buildTaosd
(
bPath
)
tdDnodes
.
start
(
1
)
sleep
(
1
)
tdsql
=
tdCom
.
newTdSql
()
print
(
tdsql
)
tdsql
.
query
(
f
"SELECT SERVER_VERSION();"
)
nowServerVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New server version is
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"SELECT CLIENT_VERSION();"
)
nowClientVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New client version is
{
nowClientVersion
}
"
)
tdLog
.
printNoPrefix
(
f
"==========step3:prepare and check data in new version-
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers1
)
os
.
system
(
f
"taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers2
}
-y "
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers2
)
tdsql
=
tdCom
.
newTdSql
()
tdLog
.
printNoPrefix
(
f
"==========step4:verify backticks in taos Sql-TD18542"
)
tdsql
.
execute
(
"drop database if exists db"
)
tdsql
.
execute
(
"create database db"
)
tdsql
.
execute
(
"use db"
)
tdsql
.
execute
(
"create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);"
)
tdsql
.
execute
(
"insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);"
)
tdsql
.
error
(
" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
error
(
" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
execute
(
"insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);"
)
tdsql
.
query
(
"select * from db.ct3"
)
tdsql
.
checkData
(
0
,
1
,
13
)
tdsql
.
execute
(
"insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);"
)
tdsql
.
query
(
"select * from db.ct4"
)
tdsql
.
checkData
(
0
,
1
,
14
)
tdsql
.
query
(
"describe information_schema.ins_databases;"
)
qRows
=
tdsql
.
queryRows
for
i
in
range
(
qRows
)
:
if
tdsql
.
queryResult
[
i
][
0
]
==
"retentions"
:
return
True
else
:
return
False
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
def
caseDescription
(
self
):
'''
3.0 data compatibility test
case1: basedata version is 3.0.1.8
'''
return
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
or
"taosd.exe"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
getCfgPath
(
self
):
buildPath
=
self
.
getBuildPath
()
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
else
:
cfgPath
=
buildPath
+
"/../sim/dnode1/cfg/"
return
cfgPath
def
installTaosd
(
self
,
bPath
,
cPath
):
# os.system(f"rmtaos && mkdir -p {self.getBuildPath()}/build/lib/temp && mv {self.getBuildPath()}/build/lib/libtaos.so* {self.getBuildPath()}/build/lib/temp/ ")
# os.system(f" mv {bPath}/build {bPath}/build_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so {self.getBuildPath()}/build/lib/libtaos.so_bak ")
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath
=
"/usr/local/src/"
packageName
=
"TDengine-server-"
+
BASEVERSION
+
"-Linux-x64.tar.gz"
packageTPath
=
packageName
.
split
(
"-Linux-"
)[
0
]
my_file
=
Path
(
f
"
{
packagePath
}
/
{
packageName
}
"
)
if
not
my_file
.
exists
():
print
(
f
"
{
packageName
}
is not exists"
)
os
.
system
(
f
"cd
{
packagePath
}
&& wget https://www.tdengine.com/assets-download/3.0/
{
packageName
}
"
)
else
:
print
(
f
"
{
packageName
}
has been exists"
)
os
.
system
(
f
"cd
{
packagePath
}
&& tar xvf
{
packageName
}
&& cd
{
packageTPath
}
&& ./install.sh -e no "
)
tdDnodes
.
stop
(
1
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
5
)
def
buildTaosd
(
self
,
bPath
):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os
.
system
(
f
" cd
{
bPath
}
&& make install "
)
def
run
(
self
):
bPath
=
self
.
getBuildPath
()
cPath
=
self
.
getCfgPath
()
dbname
=
"test"
stb
=
f
"
{
dbname
}
.meters"
self
.
installTaosd
(
bPath
,
cPath
)
os
.
system
(
"echo 'debugFlag 143' > /etc/taos/taos.cfg "
)
tableNumbers
=
100
recordNumbers1
=
100
recordNumbers2
=
1000
# tdsqlF=tdCom.newTdSql()
# print(tdsqlF)
# tdsqlF.query(f"SELECT SERVER_VERSION();")
# print(tdsqlF.query(f"SELECT SERVER_VERSION();"))
# oldServerVersion=tdsqlF.queryResult[0][0]
# tdLog.info(f"Base server version is {oldServerVersion}")
# tdsqlF.query(f"SELECT CLIENT_VERSION();")
# # the oldClientVersion can't be updated in the same python process,so the version is new compiled verison
# oldClientVersion=tdsqlF.queryResult[0][0]
# tdLog.info(f"Base client version is {oldClientVersion}")
# baseVersion = "3.0.1.8"
tdLog
.
printNoPrefix
(
f
"==========step1:prepare and check data in old version-
{
BASEVERSION
}
"
)
tdLog
.
info
(
f
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
os
.
system
(
f
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers1
}
-y "
)
sleep
(
3
)
# tdsqlF.query(f"select count(*) from {stb}")
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
os
.
system
(
"pkill taosd"
)
sleep
(
2
)
print
(
f
"start taosd: nohup taosd -c
{
cPath
}
& "
)
os
.
system
(
f
" nohup taosd -c
{
cPath
}
& "
)
sleep
(
10
)
tdLog
.
info
(
" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y "
)
os
.
system
(
"LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y"
)
os
.
system
(
"pkill -9 taosd"
)
tdLog
.
printNoPrefix
(
"==========step2:update new version "
)
self
.
buildTaosd
(
bPath
)
tdDnodes
.
start
(
1
)
sleep
(
1
)
tdsql
=
tdCom
.
newTdSql
()
print
(
tdsql
)
tdsql
.
query
(
f
"SELECT SERVER_VERSION();"
)
nowServerVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New server version is
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"SELECT CLIENT_VERSION();"
)
nowClientVersion
=
tdsql
.
queryResult
[
0
][
0
]
tdLog
.
info
(
f
"New client version is
{
nowClientVersion
}
"
)
tdLog
.
printNoPrefix
(
f
"==========step3:prepare and check data in new version-
{
nowServerVersion
}
"
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers1
)
os
.
system
(
f
"taosBenchmark -t
{
tableNumbers
}
-n
{
recordNumbers2
}
-y "
)
tdsql
.
query
(
f
"select count(*) from
{
stb
}
"
)
tdsql
.
checkData
(
0
,
0
,
tableNumbers
*
recordNumbers2
)
tdsql
.
query
(
f
"select count(*) from db4096.stb0"
)
tdsql
.
checkData
(
0
,
0
,
50000
)
tdsql
=
tdCom
.
newTdSql
()
tdLog
.
printNoPrefix
(
f
"==========step4:verify backticks in taos Sql-TD18542"
)
tdsql
.
execute
(
"drop database if exists db"
)
tdsql
.
execute
(
"create database db"
)
tdsql
.
execute
(
"use db"
)
tdsql
.
execute
(
"create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);"
)
tdsql
.
execute
(
"insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);"
)
tdsql
.
error
(
" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
error
(
" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);"
)
tdsql
.
execute
(
"insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);"
)
tdsql
.
query
(
"select * from db.ct3"
)
tdsql
.
checkData
(
0
,
1
,
13
)
tdsql
.
execute
(
"insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);"
)
tdsql
.
query
(
"select * from db.ct4"
)
tdsql
.
checkData
(
0
,
1
,
14
)
tdsql
.
query
(
"describe information_schema.ins_databases;"
)
qRows
=
tdsql
.
queryRows
for
i
in
range
(
qRows
)
:
if
tdsql
.
queryResult
[
i
][
0
]
==
"retentions"
:
return
True
else
:
return
False
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录