Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c2306088
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
c2306088
编写于
12月 24, 2021
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
TD-12506 merge 3.0
上级
e7ea3508
8e08a5fa
变更
20
显示空白变更内容
内联
并排
Showing
20 changed file
with
543 addition
and
207 deletion
+543
-207
include/common/taosmsg.h
include/common/taosmsg.h
+12
-4
include/libs/qworker/qworker.h
include/libs/qworker/qworker.h
+6
-6
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+3
-3
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+92
-92
source/dnode/mgmt/impl/test/db/db.cpp
source/dnode/mgmt/impl/test/db/db.cpp
+23
-19
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+10
-0
source/dnode/vnode/impl/CMakeLists.txt
source/dnode/vnode/impl/CMakeLists.txt
+2
-1
source/dnode/vnode/impl/inc/vnodeDef.h
source/dnode/vnode/impl/inc/vnodeDef.h
+2
-0
source/dnode/vnode/impl/inc/vnodeQuery.h
source/dnode/vnode/impl/inc/vnodeQuery.h
+31
-0
source/dnode/vnode/impl/src/vnodeInt.c
source/dnode/vnode/impl/src/vnodeInt.c
+0
-10
source/dnode/vnode/impl/src/vnodeMain.c
source/dnode/vnode/impl/src/vnodeMain.c
+5
-0
source/dnode/vnode/impl/src/vnodeQuery.c
source/dnode/vnode/impl/src/vnodeQuery.c
+35
-0
source/dnode/vnode/meta/inc/metaQuery.h
source/dnode/vnode/meta/inc/metaQuery.h
+3
-3
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+0
-2
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+138
-23
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+27
-11
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+147
-30
tests/script/general/db/basic1.sim
tests/script/general/db/basic1.sim
+2
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+4
-0
未找到文件。
include/common/taosmsg.h
浏览文件 @
c2306088
...
...
@@ -587,10 +587,6 @@ typedef struct {
typedef
struct
{
int32_t
code
;
union
{
uint64_t
qhandle
;
uint64_t
qId
;
};
// query handle
}
SQueryTableRsp
;
// todo: the show handle should be replaced with id
...
...
@@ -1121,6 +1117,10 @@ typedef struct SResReadyMsg {
uint64_t
taskId
;
}
SResReadyMsg
;
typedef
struct
SResReadyRsp
{
int32_t
code
;
}
SResReadyRsp
;
typedef
struct
SResFetchMsg
{
uint64_t
schedulerId
;
uint64_t
queryId
;
...
...
@@ -1149,12 +1149,20 @@ typedef struct STaskCancelMsg {
uint64_t
taskId
;
}
STaskCancelMsg
;
typedef
struct
STaskCancelRsp
{
int32_t
code
;
}
STaskCancelRsp
;
typedef
struct
STaskDropMsg
{
uint64_t
schedulerId
;
uint64_t
queryId
;
uint64_t
taskId
;
}
STaskDropMsg
;
typedef
struct
STaskDropRsp
{
int32_t
code
;
}
STaskDropRsp
;
#pragma pack(pop)
...
...
include/libs/qworker/qworker.h
浏览文件 @
c2306088
...
...
@@ -42,17 +42,17 @@ typedef struct {
int32_t
qWorkerInit
(
SQWorkerCfg
*
cfg
,
void
**
qWorkerMgmt
);
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
rsp
);
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
);
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
);
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
);
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessCancelMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
);
int32_t
qWorkerProcessCancelMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessDropMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
);
int32_t
qWorkerProcessDropMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
void
qWorkerDestroy
(
void
**
qWorkerMgmt
);
...
...
source/client/src/clientImpl.c
浏览文件 @
c2306088
...
...
@@ -248,7 +248,7 @@ _return:
int
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
)
{
pEpSet
->
version
=
0
;
// init m
gmt
ip set
// init m
node
ip set
SEpSet
*
mgmtEpSet
=
&
(
pEpSet
->
epSet
);
mgmtEpSet
->
numOfEps
=
0
;
mgmtEpSet
->
inUse
=
0
;
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
c2306088
source/client/test/clientTests.cpp
浏览文件 @
c2306088
...
...
@@ -49,101 +49,101 @@ int main(int argc, char** argv) {
TEST
(
testCase
,
driverInit_Test
)
{
taos_init
();
}
// TEST(testCase, connect_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
// taos_close(pConn);
//}
//
// TEST(testCase, create_user_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// TEST(testCase, create_account_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
TEST
(
testCase
,
connect_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// TEST(testCase, drop_account_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
taos_close
(
pConn
);
}
TEST
(
testCase
,
create_user_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create user abc pass 'abc'"
);
if
(
taos_errno
(
pRes
)
!=
TSDB_CODE_SUCCESS
)
{
printf
(
"failed to create user, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
create_account_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create account aabc pass 'abc'"
);
if
(
taos_errno
(
pRes
)
!=
TSDB_CODE_SUCCESS
)
{
printf
(
"failed to create user, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
drop_account_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop account aabc"
);
if
(
taos_errno
(
pRes
)
!=
TSDB_CODE_SUCCESS
)
{
printf
(
"failed to create user, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
show_user_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc
");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn)
;
//}
//
// TEST(testCase, show_user_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show users");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes
);
// int32_t numOfFields = taos_num_fields(pRes
);
//
// char str[512] = {0}
;
// while((pRow = taos_fetch_row(pRes)) != NULL
) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields
);
// printf("%s\n", str);
// }
//
//
taos_close(pConn);
//
}
//
// TEST(testCase, drop_user
_Test) {
//
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"show users
"
);
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
}
;
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_close
(
pConn
);
}
TEST
(
testCase
,
drop_user_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"drop user abc"
)
;
if
(
taos_errno
(
pRes
)
!=
TSDB_CODE_SUCCESS
)
{
printf
(
"failed to create user, reason:%s
\n
"
,
taos_errstr
(
pRes
)
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
TEST
(
testCase
,
show_db
_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
// TEST(testCase, show_db_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show databases");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_close(pConn);
//}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"show databases"
);
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_close
(
pConn
);
}
TEST
(
testCase
,
create_db_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"ubuntu"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
source/dnode/mgmt/impl/test/db/db.cpp
浏览文件 @
c2306088
...
...
@@ -27,24 +27,25 @@ Testbase DndTestDb::test;
TEST_F
(
DndTestDb
,
01
_ShowDb
)
{
test
.
SendShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
""
);
CHECK_META
(
"show databases"
,
1
7
);
CHECK_META
(
"show databases"
,
1
8
);
CHECK_SCHEMA
(
0
,
TSDB_DATA_TYPE_BINARY
,
TSDB_DB_NAME_LEN
-
1
+
VARSTR_HEADER_SIZE
,
"name"
);
CHECK_SCHEMA
(
1
,
TSDB_DATA_TYPE_TIMESTAMP
,
8
,
"create_time"
);
CHECK_SCHEMA
(
2
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"vgroups"
);
CHECK_SCHEMA
(
3
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"replica"
);
CHECK_SCHEMA
(
4
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"quorum"
);
CHECK_SCHEMA
(
5
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"days"
);
CHECK_SCHEMA
(
6
,
TSDB_DATA_TYPE_BINARY
,
24
+
VARSTR_HEADER_SIZE
,
"keep0,keep1,keep2"
);
CHECK_SCHEMA
(
7
,
TSDB_DATA_TYPE_INT
,
4
,
"cache"
);
CHECK_SCHEMA
(
8
,
TSDB_DATA_TYPE_INT
,
4
,
"blocks"
);
CHECK_SCHEMA
(
9
,
TSDB_DATA_TYPE_INT
,
4
,
"minrows"
);
CHECK_SCHEMA
(
10
,
TSDB_DATA_TYPE_INT
,
4
,
"maxrows"
);
CHECK_SCHEMA
(
11
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"wallevel"
);
CHECK_SCHEMA
(
12
,
TSDB_DATA_TYPE_INT
,
4
,
"fsync"
);
CHECK_SCHEMA
(
13
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"comp"
);
CHECK_SCHEMA
(
14
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"cachelast"
);
CHECK_SCHEMA
(
15
,
TSDB_DATA_TYPE_BINARY
,
3
+
VARSTR_HEADER_SIZE
,
"precision"
);
CHECK_SCHEMA
(
16
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"update"
);
CHECK_SCHEMA
(
3
,
TSDB_DATA_TYPE_INT
,
4
,
"ntables"
);
CHECK_SCHEMA
(
4
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"replica"
);
CHECK_SCHEMA
(
5
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"quorum"
);
CHECK_SCHEMA
(
6
,
TSDB_DATA_TYPE_SMALLINT
,
2
,
"days"
);
CHECK_SCHEMA
(
7
,
TSDB_DATA_TYPE_BINARY
,
24
+
VARSTR_HEADER_SIZE
,
"keep0,keep1,keep2"
);
CHECK_SCHEMA
(
8
,
TSDB_DATA_TYPE_INT
,
4
,
"cache"
);
CHECK_SCHEMA
(
9
,
TSDB_DATA_TYPE_INT
,
4
,
"blocks"
);
CHECK_SCHEMA
(
10
,
TSDB_DATA_TYPE_INT
,
4
,
"minrows"
);
CHECK_SCHEMA
(
11
,
TSDB_DATA_TYPE_INT
,
4
,
"maxrows"
);
CHECK_SCHEMA
(
12
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"wallevel"
);
CHECK_SCHEMA
(
13
,
TSDB_DATA_TYPE_INT
,
4
,
"fsync"
);
CHECK_SCHEMA
(
14
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"comp"
);
CHECK_SCHEMA
(
15
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"cachelast"
);
CHECK_SCHEMA
(
16
,
TSDB_DATA_TYPE_BINARY
,
3
+
VARSTR_HEADER_SIZE
,
"precision"
);
CHECK_SCHEMA
(
17
,
TSDB_DATA_TYPE_TINYINT
,
1
,
"update"
);
test
.
SendShowRetrieveMsg
();
EXPECT_EQ
(
test
.
GetShowRows
(),
0
);
...
...
@@ -82,13 +83,14 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
}
test
.
SendShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
""
);
CHECK_META
(
"show databases"
,
1
7
);
CHECK_META
(
"show databases"
,
1
8
);
test
.
SendShowRetrieveMsg
();
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
CheckBinary
(
"d1"
,
TSDB_DB_NAME_LEN
-
1
);
CheckTimestamp
();
CheckInt16
(
2
);
// vgroups
CheckInt32
(
0
);
// ntables
CheckInt16
(
1
);
// replica
CheckInt16
(
1
);
// quorum
CheckInt16
(
10
);
// days
...
...
@@ -147,6 +149,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary
(
"d1"
,
TSDB_DB_NAME_LEN
-
1
);
CheckTimestamp
();
CheckInt16
(
2
);
// vgroups
CheckInt32
(
0
);
CheckInt16
(
1
);
// replica
CheckInt16
(
2
);
// quorum
CheckInt16
(
10
);
// days
...
...
@@ -166,7 +169,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
test
.
Restart
();
test
.
SendShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
""
);
CHECK_META
(
"show databases"
,
1
7
);
CHECK_META
(
"show databases"
,
1
8
);
test
.
SendShowRetrieveMsg
();
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
...
...
@@ -174,6 +177,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary
(
"d1"
,
TSDB_DB_NAME_LEN
-
1
);
CheckTimestamp
();
CheckInt16
(
2
);
// vgroups
CheckInt32
(
0
);
CheckInt16
(
1
);
// replica
CheckInt16
(
2
);
// quorum
CheckInt16
(
10
);
// days
...
...
@@ -201,7 +205,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
}
test
.
SendShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
""
);
CHECK_META
(
"show databases"
,
1
7
);
CHECK_META
(
"show databases"
,
1
8
);
test
.
SendShowRetrieveMsg
();
EXPECT_EQ
(
test
.
GetShowRows
(),
0
);
...
...
@@ -239,7 +243,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
}
test
.
SendShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
""
);
CHECK_META
(
"show databases"
,
1
7
);
CHECK_META
(
"show databases"
,
1
8
);
test
.
SendShowRetrieveMsg
();
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
c2306088
...
...
@@ -876,6 +876,12 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"ntables"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"replica"
);
...
...
@@ -1017,6 +1023,10 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
*
(
int16_t
*
)
pWrite
=
pDb
->
cfg
.
numOfVgroups
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
0
;
// todo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pDb
->
cfg
.
replications
;
cols
++
;
...
...
source/dnode/vnode/impl/CMakeLists.txt
浏览文件 @
c2306088
...
...
@@ -15,6 +15,7 @@ target_link_libraries(
PUBLIC wal
PUBLIC sync
PUBLIC cjson
PUBLIC qworker
)
# test
...
...
source/dnode/vnode/impl/inc/vnodeDef.h
浏览文件 @
c2306088
...
...
@@ -34,6 +34,7 @@
#include "vnodeRequest.h"
#include "vnodeStateMgr.h"
#include "vnodeSync.h"
#include "vnodeQuery.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -72,6 +73,7 @@ struct SVnode {
SVnodeSync
*
pSync
;
SVnodeFS
*
pFs
;
tsem_t
canCommit
;
void
*
pQuery
;
};
int
vnodeScheduleTask
(
SVnodeTask
*
task
);
...
...
source/dnode/vnode/impl/inc/vnodeQuery.h
0 → 100644
浏览文件 @
c2306088
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_READ_H_
#define _TD_VNODE_READ_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "vnodeInt.h"
#include "qworker.h"
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VNODE_READ_H_*/
source/dnode/vnode/impl/src/vnodeInt.c
浏览文件 @
c2306088
...
...
@@ -24,16 +24,6 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; }
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
)
{
return
0
;
}
int
vnodeProcessQueryReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"query message is processed"
);
return
0
;
}
int
vnodeProcessFetchReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"fetch message is processed"
);
return
0
;
}
int
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"sync message is processed"
);
return
0
;
...
...
source/dnode/vnode/impl/src/vnodeMain.c
浏览文件 @
c2306088
...
...
@@ -127,6 +127,11 @@ static int vnodeOpenImpl(SVnode *pVnode) {
return
-
1
;
}
// Open Query
if
(
vnodeQueryOpen
(
pVnode
))
{
return
-
1
;
}
// TODO
return
0
;
}
...
...
source/dnode/vnode/impl/src/vnodeQuery.c
0 → 100644
浏览文件 @
c2306088
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
#include "vnodeQuery.h"
int
vnodeQueryOpen
(
SVnode
*
pVnode
)
{
return
qWorkerInit
(
NULL
,
&
pVnode
->
pQuery
);
}
int
vnodeProcessQueryReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"query message is processed"
);
qWorkerProcessQueryMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
return
0
;
}
int
vnodeProcessFetchReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"fetch message is processed"
);
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
return
0
;
}
source/dnode/vnode/meta/inc/metaQuery.h
浏览文件 @
c2306088
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _
TD_META
_QUERY_H_
#define _
TD_META
_QUERY_H_
#ifndef _
VNODE
_QUERY_H_
#define _
VNODE
_QUERY_H_
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -24,4 +24,4 @@ extern "C" {
}
#endif
#endif
/*_TD_META_QUERY_H_*/
\ No newline at end of file
#endif
/*_VNODE_QUERY_H_*/
\ No newline at end of file
source/libs/parser/src/astValidate.c
浏览文件 @
c2306088
...
...
@@ -4457,8 +4457,6 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
strncpy
(
pCreateMsg
->
db
,
token
.
z
,
token
.
n
);
pDcl
->
pMsg
=
(
char
*
)
pCreateMsg
;
pDcl
->
msgLen
=
sizeof
(
SCreateDbMsg
);
pDcl
->
msgType
=
(
pInfo
->
type
==
TSDB_SQL_CREATE_DB
)
?
TSDB_MSG_TYPE_CREATE_DB
:
TSDB_MSG_TYPE_ALTER_DB
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
c2306088
...
...
@@ -553,28 +553,122 @@ _return:
int32_t
qwBuildAndSendQueryRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendReadyRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SResReadyRsp
*
pRsp
=
(
SResReadyRsp
*
)
rpcMallocCont
(
sizeof
(
SResReadyRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendStatusRsp
(
SRpcMsg
*
pMsg
,
SSchedulerStatusRsp
*
sStatus
)
{
int32_t
size
=
0
;
if
(
sStatus
)
{
size
=
sizeof
(
SSchedulerStatusRsp
)
+
sizeof
(
sStatus
->
status
[
0
])
*
sStatus
->
num
;
}
else
{
size
=
sizeof
(
SSchedulerStatusRsp
);
}
SSchedulerStatusRsp
*
pRsp
=
(
SSchedulerStatusRsp
*
)
rpcMallocCont
(
size
);
if
(
sStatus
)
{
memcpy
(
pRsp
,
sStatus
,
size
);
}
else
{
pRsp
->
num
=
0
;
}
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
size
,
.
code
=
0
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendFetchRsp
(
SRpcMsg
*
pMsg
,
void
*
data
)
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
//TODO fill msg
pRsp
->
completed
=
true
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
0
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCancelRsp
(
SRpcMsg
*
pMsg
)
{
int32_t
qwBuildAndSendCancelRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
STaskCancelRsp
*
pRsp
=
(
STaskCancelRsp
*
)
rpcMallocCont
(
sizeof
(
STaskCancelRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendDropRsp
(
SRpcMsg
*
pMsg
)
{
int32_t
qwBuildAndSendDropRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
STaskDropRsp
*
pRsp
=
(
STaskDropRsp
*
)
rpcMallocCont
(
sizeof
(
STaskDropRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -712,8 +806,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
SQWorkerSchStatus
*
sch
=
NULL
;
SQWorkerTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
int32_t
needRsp
=
true
;
void
*
data
=
NULL
;
QW_ERR_RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
));
QW_ERR_
J
RET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
schedulerId
,
&
sch
));
QW_ERR_JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
));
QW_LOCK
(
QW_READ
,
&
task
->
lock
);
...
...
@@ -724,7 +820,7 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
}
if
(
QW_GOT_RES_DATA
(
res
->
data
))
{
QW_ERR_JRET
(
qwBuildAndSendFetchRsp
(
pMsg
,
res
->
data
))
;
data
=
res
->
data
;
if
(
QW_LOW_RES_DATA
(
res
->
data
))
{
if
(
task
->
status
==
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
//TODO add query back to queue
...
...
@@ -737,6 +833,8 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
}
//TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY
needRsp
=
false
;
}
_return:
...
...
@@ -746,9 +844,12 @@ _return:
if
(
sch
)
{
qwReleaseTask
(
QW_READ
,
sch
);
qwReleaseScheduler
(
QW_READ
,
mgmt
);
}
qwReleaseScheduler
(
QW_READ
,
mgmt
);
if
(
needRsp
)
{
qwBuildAndSendFetchRsp
(
pMsg
,
res
->
data
);
}
QW_RET
(
code
);
}
...
...
@@ -832,13 +933,14 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
rsp
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
||
NULL
==
rsp
)
{
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
qError
(
"invalid query msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -851,7 +953,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp
QW_ERR_JRET
(
qwCheckTaskCancelDrop
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
,
&
needStop
));
if
(
needStop
)
{
qWarn
(
"task need stop"
);
QW_ERR_RET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
QW_ERR_
J
RET
(
TSDB_CODE_QRY_TASK_CANCELLED
);
}
code
=
qStringToSubplan
(
msg
->
msg
,
&
plan
);
...
...
@@ -910,13 +1012,14 @@ _return:
QW_RET
(
code
);
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
||
NULL
==
rsp
)
{
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SResReadyMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
...
...
@@ -925,27 +1028,31 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
||
NULL
==
rsp
)
{
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
SSchTasksStatusMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSchedulerStatusRsp
*
sStatus
=
NULL
;
QW_ERR_RET
(
qwGetSchTasksStatus
(
qWorkerMgmt
,
msg
->
schedulerId
,
&
sStatus
));
QW_ERR_JRET
(
qwGetSchTasksStatus
(
qWorkerMgmt
,
msg
->
schedulerId
,
&
sStatus
));
_return:
QW_ERR_RET
(
qwBuildAndSendStatusRsp
(
pMsg
,
sStatus
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
||
NULL
==
rsp
)
{
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
...
...
@@ -971,36 +1078,44 @@ _return:
QW_RET
(
code
);
}
int32_t
qWorkerProcessCancelMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
||
NULL
==
rsp
)
{
int32_t
qWorkerProcessCancelMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
STaskCancelMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
qError
(
"invalid task cancel msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
QW_ERR_RET
(
qwCancelTask
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
));
QW_ERR_
J
RET
(
qwCancelTask
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
));
QW_ERR_RET
(
qwBuildAndSendCancelRsp
(
pMsg
));
_return:
QW_ERR_RET
(
qwBuildAndSendCancelRsp
(
pMsg
,
code
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessDropMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
rsp
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
||
NULL
==
rsp
)
{
int32_t
qWorkerProcessDropMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
STaskDropMsg
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
qError
(
"invalid task drop msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
QW_ERR_RET
(
qwCancelDropTask
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
));
QW_ERR_JRET
(
qwCancelDropTask
(
qWorkerMgmt
,
msg
->
schedulerId
,
msg
->
queryId
,
msg
->
taskId
));
_return:
QW_ERR_RET
(
qwBuildAndSendDropRsp
(
pMsg
));
QW_ERR_RET
(
qwBuildAndSendDropRsp
(
pMsg
,
code
));
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
c2306088
...
...
@@ -31,17 +31,35 @@ extern "C" {
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
enum
{
SCH_READ
=
1
,
SCH_WRITE
,
};
typedef
struct
SSchedulerMgmt
{
uint64_t
taskId
;
uint64_t
schedulerId
;
SSchedulerCfg
cfg
;
SHashObj
*
J
obs
;
// key: queryId, value: SQueryJob*
SHashObj
*
j
obs
;
// key: queryId, value: SQueryJob*
}
SSchedulerMgmt
;
typedef
struct
SQueryLevel
{
int32_t
level
;
int8_t
status
;
SRWLatch
lock
;
int32_t
taskFailed
;
int32_t
taskSucceed
;
int32_t
taskNum
;
SArray
*
subTasks
;
// Element is SQueryTask
}
SQueryLevel
;
typedef
struct
SQueryTask
{
uint64_t
taskId
;
// task id
SQueryLevel
*
level
;
// level
SSubplan
*
plan
;
// subplan
char
*
msg
;
// operator tree
int32_t
msgLen
;
// msg length
int8_t
status
;
// task status
SEpAddr
execAddr
;
// task actual executed node address
SQueryProfileSummary
summary
;
// task execution summary
...
...
@@ -50,13 +68,6 @@ typedef struct SQueryTask {
SArray
*
parents
;
// the data destination tasks, get data from current task, element is SQueryTask*
}
SQueryTask
;
typedef
struct
SQueryLevel
{
int32_t
level
;
int8_t
status
;
int32_t
taskNum
;
SArray
*
subTasks
;
// Element is SQueryTask
}
SQueryLevel
;
typedef
struct
SQueryJob
{
uint64_t
queryId
;
int32_t
levelNum
;
...
...
@@ -74,6 +85,7 @@ typedef struct SQueryJob {
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
SArray
*
levels
;
// Element is SQueryLevel, starting from 0.
SArray
*
subPlans
;
// Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
...
...
@@ -81,7 +93,8 @@ typedef struct SQueryJob {
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
...
...
@@ -91,6 +104,9 @@ typedef struct SQueryJob {
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern
int32_t
schLaunchTask
(
SQueryJob
*
job
,
SQueryTask
*
task
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
c2306088
...
...
@@ -160,11 +160,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SQueryLevel
level
=
{
0
};
SArray
*
levelPlans
=
NULL
;
int32_t
levelPlanNum
=
0
;
SQueryLevel
*
pLevel
=
NULL
;
level
.
status
=
JOB_TASK_STATUS_NOT_START
;
for
(
int32_t
i
=
0
;
i
<
levelNum
;
++
i
)
{
level
.
level
=
i
;
if
(
NULL
==
taosArrayPush
(
job
->
levels
,
&
level
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
pLevel
=
taosArrayGet
(
job
->
levels
,
i
);
pLevel
->
level
=
i
;
levelPlans
=
taosArrayGetP
(
dag
->
pSubplans
,
i
);
if
(
NULL
==
levelPlans
)
{
qError
(
"no level plans for level %d"
,
i
);
...
...
@@ -177,10 +185,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SCH_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
level
.
taskNum
=
levelPlanNum
;
pLevel
->
taskNum
=
levelPlanNum
;
level
.
subTasks
=
taosArrayInit
(
levelPlanNum
,
sizeof
(
SQueryTask
));
if
(
NULL
==
level
.
subTasks
)
{
pLevel
->
subTasks
=
taosArrayInit
(
levelPlanNum
,
sizeof
(
SQueryTask
));
if
(
NULL
==
pLevel
->
subTasks
)
{
qError
(
"taosArrayInit %d failed"
,
levelPlanNum
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -191,9 +199,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
task
.
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
task
.
plan
=
plan
;
task
.
level
=
pLevel
;
task
.
status
=
JOB_TASK_STATUS_NOT_START
;
void
*
p
=
taosArrayPush
(
level
.
subTasks
,
&
task
);
void
*
p
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
p
)
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
...
@@ -205,10 +214,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
}
}
if
(
NULL
==
taosArrayPush
(
job
->
levels
,
&
level
))
{
qError
(
"taosArrayPush failed"
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SCH_ERR_JRET
(
schBuildTaskRalation
(
job
,
planToTask
));
...
...
@@ -220,8 +225,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
return
TSDB_CODE_SUCCESS
;
_return:
if
(
level
.
subTasks
)
{
taosArrayDestroy
(
level
.
subTasks
);
if
(
pLevel
->
subTasks
)
{
taosArrayDestroy
(
pLevel
->
subTasks
);
}
if
(
planToTask
)
{
...
...
@@ -273,7 +278,23 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
return
TSDB_CODE_SUCCESS
;
}
if
(
0
!=
taosHashPut
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
job
->
succTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
moved
=
true
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
schMoveTaskToFailList
(
SQueryJob
*
job
,
SQueryTask
*
task
,
bool
*
moved
)
{
if
(
0
!=
taosHashRemove
(
job
->
execTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
)))
{
qWarn
(
"remove task[%"
PRIx64
"] from execTasks failed"
,
task
->
taskId
);
return
TSDB_CODE_SUCCESS
;
}
if
(
0
!=
taosHashPut
(
job
->
failTasks
,
&
task
->
taskId
,
sizeof
(
task
->
taskId
),
&
task
,
POINTER_BYTES
))
{
qError
(
"taosHashPut failed"
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -289,14 +310,23 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
void
*
msg
=
NULL
;
switch
(
msgType
)
{
case
TSDB_MSG_TYPE_SUBMIT
:
{
if
(
NULL
==
task
->
msg
||
task
->
msgLen
<=
0
)
{
qError
(
"submit msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
msgSize
=
task
->
msgLen
;
msg
=
task
->
msg
;
break
;
}
case
TSDB_MSG_TYPE_QUERY
:
{
if
(
NULL
==
task
->
msg
)
{
qError
(
"query msg is NULL"
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
int32_t
len
=
strlen
(
task
->
msg
);
msgSize
=
sizeof
(
SSubQueryMsg
)
+
len
+
1
;
msgSize
=
sizeof
(
SSubQueryMsg
)
+
task
->
msgLen
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
...
...
@@ -308,9 +338,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
pMsg
->
contentLen
=
htonl
(
len
);
memcpy
(
pMsg
->
msg
,
task
->
msg
,
len
);
pMsg
->
msg
[
len
]
=
0
;
pMsg
->
contentLen
=
htonl
(
task
->
msgLen
);
memcpy
(
pMsg
->
msg
,
task
->
msg
,
task
->
msgLen
);
break
;
}
case
TSDB_MSG_TYPE_RES_READY
:
{
...
...
@@ -322,6 +351,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
SResReadyMsg
*
pMsg
=
msg
;
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
...
...
@@ -335,6 +365,21 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
SResFetchMsg
*
pMsg
=
msg
;
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
}
case
TSDB_MSG_TYPE_DROP_TASK
:{
msgSize
=
sizeof
(
STaskDropMsg
);
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
STaskDropMsg
*
pMsg
=
msg
;
pMsg
->
schedulerId
=
htobe64
(
schMgmt
.
schedulerId
);
pMsg
->
queryId
=
htobe64
(
job
->
queryId
);
pMsg
->
taskId
=
htobe64
(
task
->
taskId
);
break
;
...
...
@@ -345,6 +390,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
//TODO SEND MSG
//taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -425,8 +471,29 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
taskDone
=
0
;
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
SCH_LOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
task
->
level
->
taskFailed
++
;
taskDone
=
task
->
level
->
taskSucceed
+
task
->
level
->
taskFailed
;
SCH_UNLOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
if
(
taskDone
<
task
->
level
->
taskNum
)
{
qDebug
(
"wait all tasks, done:%d, all:%d"
,
taskDone
,
task
->
level
->
taskNum
);
return
TSDB_CODE_SUCCESS
;
}
if
(
task
->
level
->
taskFailed
>
0
)
{
job
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
));
return
TSDB_CODE_SUCCESS
;
}
}
else
{
strncpy
(
job
->
resEp
.
fqdn
,
task
->
execAddr
.
fqdn
,
sizeof
(
job
->
resEp
.
fqdn
));
job
->
resEp
.
port
=
task
->
execAddr
.
port
;
}
SCH_ERR_RET
(
schProcessOnJobSuccess
(
job
));
...
...
@@ -457,11 +524,31 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
int32_t
schProcessOnTaskFailure
(
SQueryJob
*
job
,
SQueryTask
*
task
,
int32_t
errCode
)
{
bool
needRetry
=
false
;
bool
moved
=
false
;
int32_t
taskDone
=
0
;
SCH_ERR_RET
(
schTaskCheckAndSetRetry
(
job
,
task
,
errCode
,
&
needRetry
));
if
(
!
needRetry
)
{
SCH_TASK_ERR_LOG
(
"task failed[%x], no more retry"
,
errCode
);
SCH_ERR_RET
(
schMoveTaskToFailList
(
job
,
task
,
&
moved
));
if
(
!
moved
)
{
SCH_TASK_ERR_LOG
(
"task may already moved, status:%d"
,
task
->
status
);
return
TSDB_CODE_SUCCESS
;
}
if
(
SCH_TASK_NEED_WAIT_ALL
(
task
))
{
SCH_LOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
task
->
level
->
taskFailed
++
;
taskDone
=
task
->
level
->
taskSucceed
+
task
->
level
->
taskFailed
;
SCH_UNLOCK
(
SCH_WRITE
,
&
task
->
level
->
lock
);
if
(
taskDone
<
task
->
level
->
taskNum
)
{
qDebug
(
"wait all tasks, done:%d, all:%d"
,
taskDone
,
task
->
level
->
taskNum
);
return
TSDB_CODE_SUCCESS
;
}
}
job
->
status
=
JOB_TASK_STATUS_FAILED
;
SCH_ERR_RET
(
schProcessOnJobFailure
(
job
));
...
...
@@ -522,8 +609,7 @@ _return:
int32_t
schLaunchTask
(
SQueryJob
*
job
,
SQueryTask
*
task
)
{
SSubplan
*
plan
=
task
->
plan
;
int32_t
len
=
0
;
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
,
&
len
));
SCH_ERR_RET
(
qSubPlanToString
(
plan
,
&
task
->
msg
,
&
task
->
msgLen
));
if
(
plan
->
execEpSet
.
numOfEps
<=
0
)
{
SCH_ERR_RET
(
schSetTaskExecEpSet
(
job
,
&
plan
->
execEpSet
));
}
...
...
@@ -533,7 +619,9 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
SCH_ERR_RET
(
schAsyncSendMsg
(
job
,
task
,
TSDB_MSG_TYPE_QUERY
));
int32_t
msgType
=
(
plan
->
type
==
QUERY_TYPE_MODIFY
)
?
TSDB_MSG_TYPE_SUBMIT
:
TSDB_MSG_TYPE_QUERY
;
SCH_ERR_RET
(
schAsyncSendMsg
(
job
,
task
,
msgType
));
SCH_ERR_RET
(
schPushTaskToExecList
(
job
,
task
));
...
...
@@ -554,6 +642,25 @@ int32_t schLaunchJob(SQueryJob *job) {
return
TSDB_CODE_SUCCESS
;
}
void
schDropJobAllTasks
(
SQueryJob
*
job
)
{
void
*
pIter
=
taosHashIterate
(
job
->
succTasks
,
NULL
);
while
(
pIter
)
{
SQueryTask
*
task
=
*
(
SQueryTask
**
)
pIter
;
schAsyncSendMsg
(
job
,
task
,
TSDB_MSG_TYPE_DROP_TASK
);
pIter
=
taosHashIterate
(
job
->
succTasks
,
pIter
);
}
pIter
=
taosHashIterate
(
job
->
failTasks
,
NULL
);
while
(
pIter
)
{
SQueryTask
*
task
=
*
(
SQueryTask
**
)
pIter
;
schAsyncSendMsg
(
job
,
task
,
TSDB_MSG_TYPE_DROP_TASK
);
pIter
=
taosHashIterate
(
job
->
succTasks
,
pIter
);
}
}
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
)
{
if
(
cfg
)
{
...
...
@@ -562,8 +669,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt
.
cfg
.
maxJobNum
=
SCHEDULE_DEFAULT_JOB_NUMBER
;
}
schMgmt
.
J
obs
=
taosHashInit
(
schMgmt
.
cfg
.
maxJobNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
schMgmt
.
J
obs
)
{
schMgmt
.
j
obs
=
taosHashInit
(
schMgmt
.
cfg
.
maxJobNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
schMgmt
.
j
obs
)
{
SCH_ERR_LRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
,
"init %d schduler jobs failed"
,
schMgmt
.
cfg
.
maxJobNum
);
}
...
...
@@ -605,9 +712,15 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
job
->
failTasks
=
taosHashInit
(
pDag
->
numOfSubplans
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
job
->
failTasks
)
{
qError
(
"taosHashInit %d failed"
,
pDag
->
numOfSubplans
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
tsem_init
(
&
job
->
rspSem
,
0
,
0
);
if
(
0
!=
taosHashPut
(
schMgmt
.
J
obs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
),
&
job
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
schMgmt
.
j
obs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
),
&
job
,
POINTER_BYTES
))
{
qError
(
"taosHashPut queryId:%"
PRIx64
" failed"
,
job
->
queryId
);
SCH_ERR_JRET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
...
...
@@ -659,6 +772,8 @@ _return:
int32_t
scheduleCancelJob
(
void
*
pJob
)
{
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -670,7 +785,7 @@ void scheduleFreeJob(void *pJob) {
SQueryJob
*
job
=
pJob
;
if
(
job
->
status
>
0
)
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
J
obs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
)))
{
if
(
0
!=
taosHashRemove
(
schMgmt
.
j
obs
,
&
job
->
queryId
,
sizeof
(
job
->
queryId
)))
{
qError
(
"remove job:%"
PRIx64
"from mgmt failed"
,
job
->
queryId
);
// maybe already freed
return
;
}
...
...
@@ -678,15 +793,17 @@ void scheduleFreeJob(void *pJob) {
if
(
job
->
status
==
JOB_TASK_STATUS_EXECUTING
)
{
scheduleCancelJob
(
pJob
);
}
schDropJobAllTasks
(
job
);
}
//TODO free job
}
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
J
obs
)
{
taosHashCleanup
(
schMgmt
.
J
obs
);
//TODO
schMgmt
.
J
obs
=
NULL
;
if
(
schMgmt
.
j
obs
)
{
taosHashCleanup
(
schMgmt
.
j
obs
);
//TODO
schMgmt
.
j
obs
=
NULL
;
}
}
...
...
tests/script/general/db/basic1.sim
浏览文件 @
c2306088
...
...
@@ -15,7 +15,7 @@ if $data00 != d1 then
return -1
endi
if $data02 !=
0
then
if $data02 !=
2
then
return -1
endi
...
...
@@ -51,7 +51,7 @@ if $data00 != d4 then
return -1
endi
if $data02 !=
0
then
if $data02 !=
2
then
return -1
endi
...
...
tests/script/jenkins/basic.txt
浏览文件 @
c2306088
#======================b1-start===============
# ---- user
./test.sh -f general/user/basic1.sim
# ---- db
./test.sh -f general/db/basic1.sim
#======================b1-end===============
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录