Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6d9ae1af
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6d9ae1af
编写于
1月 18, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-11818]Support select * from super_table.
上级
3b23380c
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
240 addition
and
177 deletion
+240
-177
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+14
-3
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+96
-96
source/dnode/vnode/inc/meta.h
source/dnode/vnode/inc/meta.h
+1
-1
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+19
-0
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+8
-6
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+19
-23
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+47
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-11
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+21
-7
source/libs/qworker/CMakeLists.txt
source/libs/qworker/CMakeLists.txt
+0
-16
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+1
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+11
-9
未找到文件。
source/client/src/clientImpl.c
浏览文件 @
6d9ae1af
...
@@ -257,7 +257,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
...
@@ -257,7 +257,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return
pRequest
->
code
;
return
pRequest
->
code
;
}
}
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
NULL
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
SArray
*
execNode
=
taosArrayInit
(
4
,
sizeof
(
SQueryNodeAddr
));
SQueryNodeAddr
addr
=
{.
numOfEps
=
1
,
.
inUse
=
0
,
.
nodeId
=
0
};
addr
.
epAddr
[
0
].
port
=
6030
;
strcpy
(
addr
.
epAddr
[
0
].
fqdn
,
"ubuntu"
);
taosArrayPush
(
execNode
,
&
addr
);
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
execNode
,
pDag
,
&
pRequest
->
body
.
pQueryJob
);
}
}
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_t
tmq_t
;
...
@@ -706,9 +713,13 @@ void* doFetchRow(SRequestObj* pRequest) {
...
@@ -706,9 +713,13 @@ void* doFetchRow(SRequestObj* pRequest) {
return
NULL
;
return
NULL
;
}
}
scheduleFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pRequest
->
body
.
resInfo
.
pData
);
int32_t
code
=
scheduleFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pRequest
->
body
.
resInfo
.
pData
);
setQueryResultByRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pRequest
->
body
.
resInfo
.
pData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
return
NULL
;
}
setQueryResultByRsp
(
&
pRequest
->
body
.
resInfo
,
(
SRetrieveTableRsp
*
)
pRequest
->
body
.
resInfo
.
pData
);
if
(
pResultInfo
->
numOfRows
==
0
)
{
if
(
pResultInfo
->
numOfRows
==
0
)
{
return
NULL
;
return
NULL
;
}
}
...
...
source/client/test/clientTests.cpp
浏览文件 @
6d9ae1af
...
@@ -49,7 +49,7 @@ int main(int argc, char** argv) {
...
@@ -49,7 +49,7 @@ int main(int argc, char** argv) {
TEST
(
testCase
,
driverInit_Test
)
{
taos_init
();
}
TEST
(
testCase
,
driverInit_Test
)
{
taos_init
();
}
TEST
(
testCase
,
connect_Test
)
{
TEST
(
testCase
,
connect_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
"abc1"
,
0
);
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
if
(
pConn
==
NULL
)
{
printf
(
"failed to connect to server, reason:%s
\n
"
,
taos_errstr
(
NULL
));
printf
(
"failed to connect to server, reason:%s
\n
"
,
taos_errstr
(
NULL
));
}
}
...
@@ -295,24 +295,24 @@ TEST(testCase, connect_Test) {
...
@@ -295,24 +295,24 @@ TEST(testCase, connect_Test) {
// taos_close(pConn);
// taos_close(pConn);
//}
//}
TEST
(
testCase
,
create_table_Test
)
{
//
TEST(testCase, create_table_Test) {
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert
(
pConn
!=
NULL
);
//
assert(pConn != NULL);
//
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
//
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result
(
pRes
);
//
taos_free_result(pRes);
//
pRes
=
taos_query
(
pConn
,
"create table if not exists tm0(ts timestamp, k int)"
);
//
pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)");
ASSERT_EQ
(
taos_errno
(
pRes
),
0
);
//
ASSERT_EQ(taos_errno(pRes), 0);
//
taos_free_result
(
pRes
);
//
taos_free_result(pRes);
//
pRes
=
taos_query
(
pConn
,
"create table if not exists tm0(ts timestamp, k blob)"
);
//
pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)");
ASSERT_NE
(
taos_errno
(
pRes
),
0
);
//
ASSERT_NE(taos_errno(pRes), 0);
//
taos_free_result
(
pRes
);
//
taos_free_result(pRes);
taos_close
(
pConn
);
//
taos_close(pConn);
}
//
}
//TEST(testCase, create_ctable_Test) {
//TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
@@ -333,36 +333,36 @@ TEST(testCase, create_table_Test) {
...
@@ -333,36 +333,36 @@ TEST(testCase, create_table_Test) {
// taos_close(pConn);
// taos_close(pConn);
//}
//}
TEST
(
testCase
,
show_stable_Test
)
{
//TEST(testCase, show_stable_Test) {
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert
(
pConn
!=
nullptr
);
// assert(pConn != nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
//// TAOS_RES* pRes = taos_query(pConn, "use abc1");
//// if (taos_errno(pRes) != 0) {
//// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
//// }
//// taos_free_result(pRes);
//
// TAOS_RES* pRes = taos_query(pConn, "show abc1.stables");
// if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_free_result(pRes);
// taos_close(pConn);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"show abc1.stables"
);
//}
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to show stables, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
//
//
//TEST(testCase, show_vgroup_Test) {
//TEST(testCase, show_vgroup_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
@@ -521,29 +521,29 @@ TEST(testCase, show_stable_Test) {
...
@@ -521,29 +521,29 @@ TEST(testCase, show_stable_Test) {
// taosHashCleanup(phash);
// taosHashCleanup(phash);
//}
//}
//
//
TEST
(
testCase
,
create_topic_Test
)
{
//
TEST(testCase, create_topic_Test) {
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert
(
pConn
!=
NULL
);
//
assert(pConn != NULL);
//
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
//
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if
(
taos_errno
(
pRes
)
!=
0
)
{
//
if (taos_errno(pRes) != 0) {
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
//
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//
}
taos_free_result
(
pRes
);
//
taos_free_result(pRes);
//
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE
(
pFields
==
nullptr
);
//
ASSERT_TRUE(pFields == nullptr);
//
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
//
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ
(
numOfFields
,
0
);
//
ASSERT_EQ(numOfFields, 0);
//
taos_free_result
(
pRes
);
//
taos_free_result(pRes);
//
char
*
sql
=
"select * from tu"
;
//
char* sql = "select * from tu";
pRes
=
taos_create_topic
(
pConn
,
"test_topic_1"
,
sql
,
strlen
(
sql
));
//
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
taos_free_result
(
pRes
);
//
taos_free_result(pRes);
taos_close
(
pConn
);
//
taos_close(pConn);
}
//
}
//TEST(testCase, insert_test) {
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
@@ -614,30 +614,30 @@ TEST(testCase, create_topic_Test) {
...
@@ -614,30 +614,30 @@ TEST(testCase, create_topic_Test) {
// taos_close(pConn);
// taos_close(pConn);
//}
//}
//
TEST(testCase, projection_query_stables) {
TEST
(
testCase
,
projection_query_stables
)
{
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
//
ASSERT_NE(pConn, nullptr);
ASSERT_NE
(
pConn
,
nullptr
);
//
//
TAOS_RES* pRes = taos_query(pConn, "use abc1");
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
//
taos_free_result(pRes);
taos_free_result
(
pRes
);
//
// pRes = taos_query(pConn, "select ts,k
from m1");
pRes
=
taos_query
(
pConn
,
"select ts
from m1"
);
//
if (taos_errno(pRes) != 0) {
if
(
taos_errno
(
pRes
)
!=
0
)
{
//
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
printf
(
"failed to select from table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
//
taos_free_result(pRes);
taos_free_result
(
pRes
);
//
ASSERT_TRUE(false);
ASSERT_TRUE
(
false
);
//
}
}
//
//
TAOS_ROW pRow = NULL;
TAOS_ROW
pRow
=
NULL
;
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
//
int32_t numOfFields = taos_num_fields(pRes);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
//
//
char str[512] = {0};
char
str
[
512
]
=
{
0
};
//
while ((pRow = taos_fetch_row(pRes)) != NULL) {
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
//
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
//
printf("%s\n", str);
printf
(
"%s
\n
"
,
str
);
//
}
}
//
//
taos_free_result(pRes);
taos_free_result
(
pRes
);
//
taos_close(pConn);
taos_close
(
pConn
);
//
}
}
source/dnode/vnode/inc/meta.h
浏览文件 @
6d9ae1af
...
@@ -67,7 +67,7 @@ char * metaTbCursorNext(SMTbCursor *pTbCur);
...
@@ -67,7 +67,7 @@ char * metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
char
*
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
// Options
// Options
void
metaOptionsInit
(
SMetaCfg
*
pMetaCfg
);
void
metaOptionsInit
(
SMetaCfg
*
pMetaCfg
);
...
...
source/dnode/vnode/inc/tsdb.h
浏览文件 @
6d9ae1af
...
@@ -92,6 +92,7 @@ int tsdbOptionsInit(STsdbCfg *);
...
@@ -92,6 +92,7 @@ int tsdbOptionsInit(STsdbCfg *);
void
tsdbOptionsClear
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
typedef
void
*
tsdbReadHandleT
;
typedef
void
*
tsdbReadHandleT
;
/**
/**
* Get the data block iterator, starting from position according to the query condition
* Get the data block iterator, starting from position according to the query condition
*
*
...
@@ -123,6 +124,24 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro
...
@@ -123,6 +124,24 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro
bool
isTsdbCacheLastRow
(
tsdbReadHandleT
*
pTsdbReadHandle
);
bool
isTsdbCacheLastRow
(
tsdbReadHandleT
*
pTsdbReadHandle
);
/**
*
* @param tsdb
* @param uid
* @param skey
* @param pTagCond
* @param len
* @param tagNameRelType
* @param tbnameCond
* @param pGroupInfo
* @param pColIndex
* @param numOfCols
* @param reqId
* @return
*/
int32_t
tsdbQuerySTableByTagCond
(
STsdb
*
tsdb
,
uint64_t
uid
,
TSKEY
skey
,
const
char
*
pTagCond
,
size_t
len
,
int16_t
tagNameRelType
,
const
char
*
tbnameCond
,
STableGroupInfo
*
pGroupInfo
,
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
);
/**
/**
* get num of rows in mem table
* get num of rows in mem table
*
*
...
...
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
6d9ae1af
...
@@ -658,7 +658,7 @@ void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
...
@@ -658,7 +658,7 @@ void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
}
}
}
}
char
*
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
)
{
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
)
{
DBT
skey
=
{
0
};
DBT
skey
=
{
0
};
DBT
pkey
=
{
0
};
DBT
pkey
=
{
0
};
DBT
pval
=
{
0
};
DBT
pval
=
{
0
};
...
@@ -669,11 +669,13 @@ char *metaCtbCursorNext(SMCtbCursor *pCtbCur) {
...
@@ -669,11 +669,13 @@ char *metaCtbCursorNext(SMCtbCursor *pCtbCur) {
skey
.
data
=
&
(
pCtbCur
->
suid
);
skey
.
data
=
&
(
pCtbCur
->
suid
);
skey
.
size
=
sizeof
(
pCtbCur
->
suid
);
skey
.
size
=
sizeof
(
pCtbCur
->
suid
);
if
(
pCtbCur
->
pCur
->
get
(
pCtbCur
->
pCur
,
&
skey
,
&
pval
,
DB_NEXT
)
==
0
)
{
if
(
pCtbCur
->
pCur
->
pget
(
pCtbCur
->
pCur
,
&
skey
,
&
pkey
,
&
pval
,
DB_NEXT
)
==
0
)
{
pBuf
=
pval
.
data
;
tb_uid_t
id
=
*
(
tb_uid_t
*
)
pkey
.
data
;
metaDecodeTbInfo
(
pBuf
,
&
tbCfg
);
assert
(
id
!=
0
);
return
tbCfg
.
name
;
return
id
;
// metaDecodeTbInfo(pBuf, &tbCfg);
// return tbCfg.;
}
else
{
}
else
{
return
NULL
;
return
0
;
}
}
}
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
6d9ae1af
...
@@ -152,7 +152,7 @@ typedef struct STsdbReadHandle {
...
@@ -152,7 +152,7 @@ typedef struct STsdbReadHandle {
typedef
struct
STableGroupSupporter
{
typedef
struct
STableGroupSupporter
{
int32_t
numOfCols
;
int32_t
numOfCols
;
SColIndex
*
pCols
;
SColIndex
*
pCols
;
S
TSchema
*
pTagSchema
;
S
Schema
*
pTagSchema
;
}
STableGroupSupporter
;
}
STableGroupSupporter
;
static
STimeWindow
updateLastrowForEachGroup
(
STableGroupInfo
*
groupList
);
static
STimeWindow
updateLastrowForEachGroup
(
STableGroupInfo
*
groupList
);
...
@@ -466,7 +466,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
...
@@ -466,7 +466,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
return
(
tsdbReadHandleT
)
pReadHandle
;
return
(
tsdbReadHandleT
)
pReadHandle
;
_end:
_end:
// tsdbCleanupQueryHandle(pTsdb
ReadHandle);
tsdbCleanupQueryHandle
(
p
ReadHandle
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
...
@@ -2630,18 +2630,20 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
...
@@ -2630,18 +2630,20 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
return
numOfRows
;
return
numOfRows
;
}
}
static
int32_t
getAllTableList
(
STable
*
pSuperTable
,
SArray
*
list
)
{
static
int32_t
getAllTableList
(
SMeta
*
pMeta
,
uint64_t
uid
,
SArray
*
list
)
{
SSkipListIterator
*
iter
=
NULL
;
//tSkipListCreateIter(pSuperTable->pIndex);
SMCtbCursor
*
pCur
=
metaOpenCtbCursor
(
pMeta
,
uid
);
while
(
tSkipListIterNext
(
iter
))
{
SSkipListNode
*
pNode
=
tSkipListIterGet
(
iter
);
STable
*
pTable
=
(
STable
*
)
SL_GET_NODE_DATA
((
SSkipListNode
*
)
pNode
);
while
(
1
)
{
tb_uid_t
id
=
metaCtbCursorNext
(
pCur
);
if
(
id
==
0
)
{
break
;
}
STableKeyInfo
info
=
{.
pTable
=
pTable
,
.
lastKey
=
TSKEY_INITIAL_VAL
};
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
TSKEY_INITIAL_VAL
,
uid
=
id
};
taosArrayPush
(
list
,
&
info
);
taosArrayPush
(
list
,
&
info
);
}
}
tSkipListDestroyIter
(
ite
r
);
metaCloseCtbCurosr
(
pCu
r
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -3553,7 +3555,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
...
@@ -3553,7 +3555,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush
(
pGroups
,
&
g
);
taosArrayPush
(
pGroups
,
&
g
);
}
}
SArray
*
createTableGroup
(
SArray
*
pTableList
,
S
TSchema
*
pTagSchema
,
SColIndex
*
pCols
,
int32_t
numOfOrderCols
,
TSKEY
skey
)
{
SArray
*
createTableGroup
(
SArray
*
pTableList
,
S
SchemaWrapper
*
pTagSchema
,
SColIndex
*
pCols
,
int32_t
numOfOrderCols
,
TSKEY
skey
)
{
assert
(
pTableList
!=
NULL
);
assert
(
pTableList
!=
NULL
);
SArray
*
pTableGroup
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
pTableGroup
=
taosArrayInit
(
1
,
POINTER_BYTES
);
...
@@ -3564,25 +3566,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
...
@@ -3564,25 +3566,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
}
if
(
numOfOrderCols
==
0
||
size
==
1
)
{
// no group by tags clause or only one table
if
(
numOfOrderCols
==
0
||
size
==
1
)
{
// no group by tags clause or only one table
SArray
*
sa
=
taosArray
Init
(
size
,
sizeof
(
STableKeyInfo
)
);
SArray
*
sa
=
taosArray
Dup
(
pTableList
);
if
(
sa
==
NULL
)
{
if
(
sa
==
NULL
)
{
taosArrayDestroy
(
pTableGroup
);
taosArrayDestroy
(
pTableGroup
);
return
NULL
;
return
NULL
;
}
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
taosArrayGet
(
pTableList
,
i
);
STableKeyInfo
info
=
{.
pTable
=
pKeyInfo
->
pTable
,
.
lastKey
=
skey
};
taosArrayPush
(
sa
,
&
info
);
}
taosArrayPush
(
pTableGroup
,
&
sa
);
taosArrayPush
(
pTableGroup
,
&
sa
);
tsdbDebug
(
"all %"
PRIzu
" tables belong to one group"
,
size
);
tsdbDebug
(
"all %"
PRIzu
" tables belong to one group"
,
size
);
}
else
{
}
else
{
STableGroupSupporter
sup
=
{
0
};
STableGroupSupporter
sup
=
{
0
};
sup
.
numOfCols
=
numOfOrderCols
;
sup
.
numOfCols
=
numOfOrderCols
;
sup
.
pTagSchema
=
pTagSchema
;
sup
.
pTagSchema
=
pTagSchema
->
pSchema
;
sup
.
pCols
=
pCols
;
sup
.
pCols
=
pCols
;
// taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
// taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
...
@@ -3710,12 +3705,11 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
...
@@ -3710,12 +3705,11 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
//NOTE: not add ref count for super table
//NOTE: not add ref count for super table
SArray
*
res
=
taosArrayInit
(
8
,
sizeof
(
STableKeyInfo
));
SArray
*
res
=
taosArrayInit
(
8
,
sizeof
(
STableKeyInfo
));
S
TSchema
*
pTagSchema
=
metaGetTableSchema
(
tsdb
->
pMeta
,
uid
,
0
,
true
);
S
SchemaWrapper
*
pTagSchema
=
metaGetTableSchema
(
tsdb
->
pMeta
,
uid
,
0
,
true
);
// no tags and tbname condition, all child tables of this stable are involved
// no tags and tbname condition, all child tables of this stable are involved
if
(
tbnameCond
==
NULL
&&
(
pTagCond
==
NULL
||
len
==
0
))
{
if
(
tbnameCond
==
NULL
&&
(
pTagCond
==
NULL
||
len
==
0
))
{
assert
(
false
);
int32_t
ret
=
getAllTableList
(
tsdb
->
pMeta
,
uid
,
res
);
int32_t
ret
=
0
;
//getAllTableList(pTable, res);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
}
}
...
@@ -3854,7 +3848,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGro
...
@@ -3854,7 +3848,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGro
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
}
#endif
static
void
*
doFreeColumnInfoData
(
SArray
*
pColumnInfoData
)
{
static
void
*
doFreeColumnInfoData
(
SArray
*
pColumnInfoData
)
{
if
(
pColumnInfoData
==
NULL
)
{
if
(
pColumnInfoData
==
NULL
)
{
return
NULL
;
return
NULL
;
...
@@ -3883,6 +3877,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
...
@@ -3883,6 +3877,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
return
NULL
;
return
NULL
;
}
}
void
tsdbCleanupQueryHandle
(
tsdbReadHandleT
queryHandle
)
{
void
tsdbCleanupQueryHandle
(
tsdbReadHandleT
queryHandle
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
(
STsdbReadHandle
*
)
queryHandle
;
STsdbReadHandle
*
pTsdbReadHandle
=
(
STsdbReadHandle
*
)
queryHandle
;
if
(
pTsdbReadHandle
==
NULL
)
{
if
(
pTsdbReadHandle
==
NULL
)
{
...
@@ -3921,6 +3916,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
...
@@ -3921,6 +3916,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
tfree
(
pTsdbReadHandle
);
tfree
(
pTsdbReadHandle
);
}
}
#if 0
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
assert(pGroupList != NULL);
assert(pGroupList != NULL);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
6d9ae1af
...
@@ -649,6 +649,6 @@ int32_t getMaximumIdleDurationSec();
...
@@ -649,6 +649,6 @@ int32_t getMaximumIdleDurationSec();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
);
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
STableGroupInfo
*
pGroupInfo
,
void
*
readerHandle
);
#endif // TDENGINE_EXECUTORIMPL_H
#endif // TDENGINE_EXECUTORIMPL_H
source/libs/executor/src/executorMain.c
浏览文件 @
6d9ae1af
...
@@ -13,10 +13,11 @@
...
@@ -13,10 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "os.h"
#include <tsdb.h>
#include "tarray.h"
#include "dataSinkMgt.h"
#include "dataSinkMgt.h"
#include "exception.h"
#include "exception.h"
#include "os.h"
#include "tarray.h"
#include "tcache.h"
#include "tcache.h"
#include "tglobal.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tmsg.h"
...
@@ -72,7 +73,45 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
...
@@ -72,7 +73,45 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
assert
(
tsdb
!=
NULL
&&
pSubplan
!=
NULL
);
assert
(
tsdb
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
doCreateExecTaskInfo
(
pSubplan
,
pTask
,
tsdb
);
int32_t
code
=
0
;
uint64_t
uid
=
0
;
STimeWindow
window
=
TSWINDOW_INITIALIZER
;
int32_t
tableType
=
0
;
SPhyNode
*
pPhyNode
=
pSubplan
->
pNode
;
if
(
pPhyNode
->
info
.
type
==
OP_TableScan
||
pPhyNode
->
info
.
type
==
OP_DataBlocksOptScan
)
{
STableScanPhyNode
*
pTableScanNode
=
(
STableScanPhyNode
*
)
pPhyNode
;
uid
=
pTableScanNode
->
scan
.
uid
;
window
=
pTableScanNode
->
window
;
tableType
=
pTableScanNode
->
scan
.
tableType
;
}
else
{
assert
(
0
);
}
STableGroupInfo
groupInfo
=
{
0
};
if
(
tableType
==
TSDB_SUPER_TABLE
)
{
code
=
tsdbQuerySTableByTagCond
(
tsdb
,
uid
,
window
.
skey
,
NULL
,
0
,
0
,
NULL
,
&
groupInfo
,
NULL
,
0
,
pSubplan
->
id
.
queryId
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
}
else
{
// Create one table group.
groupInfo
.
numOfTables
=
1
;
groupInfo
.
pGroupList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
pa
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
0
,
.
uid
=
uid
};
taosArrayPush
(
pa
,
&
info
);
taosArrayPush
(
groupInfo
.
pGroupList
,
&
pa
);
}
if
(
groupInfo
.
numOfTables
==
0
)
{
code
=
0
;
// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId);
goto
_error
;
}
code
=
doCreateExecTaskInfo
(
pSubplan
,
pTask
,
&
groupInfo
,
tsdb
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
}
}
...
@@ -141,6 +180,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
...
@@ -141,6 +180,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int64_t
threadId
=
taosGetSelfPthreadId
();
int64_t
threadId
=
taosGetSelfPthreadId
();
// todo: remove it.
if
(
tinfo
==
NULL
)
{
return
NULL
;
}
*
pRes
=
NULL
;
*
pRes
=
NULL
;
int64_t
curOwner
=
0
;
int64_t
curOwner
=
0
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
6d9ae1af
...
@@ -7369,15 +7369,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
...
@@ -7369,15 +7369,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
}
}
}
}
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
)
{
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
STableGroupInfo
*
pGroupInfo
,
void
*
readerHandle
)
{
STsdbQueryCond
cond
=
{.
loadExternalRows
=
false
};
STsdbQueryCond
cond
=
{.
loadExternalRows
=
false
};
uint64_t
uid
=
0
;
SPhyNode
*
pPhyNode
=
pPlan
->
pNode
;
SPhyNode
*
pPhyNode
=
pPlan
->
pNode
;
if
(
pPhyNode
->
info
.
type
==
OP_TableScan
||
pPhyNode
->
info
.
type
==
OP_DataBlocksOptScan
)
{
if
(
pPhyNode
->
info
.
type
==
OP_TableScan
||
pPhyNode
->
info
.
type
==
OP_DataBlocksOptScan
)
{
STableScanPhyNode
*
pTableScanNode
=
(
STableScanPhyNode
*
)
pPhyNode
;
STableScanPhyNode
*
pTableScanNode
=
(
STableScanPhyNode
*
)
pPhyNode
;
uid
=
pTableScanNode
->
scan
.
uid
;
cond
.
order
=
pTableScanNode
->
scan
.
order
;
cond
.
order
=
pTableScanNode
->
scan
.
order
;
cond
.
numOfCols
=
taosArrayGetSize
(
pTableScanNode
->
scan
.
node
.
pTargets
);
cond
.
numOfCols
=
taosArrayGetSize
(
pTableScanNode
->
scan
.
node
.
pTargets
);
cond
.
colList
=
calloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
cond
.
colList
=
calloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
...
@@ -7397,15 +7395,8 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
...
@@ -7397,15 +7395,8 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
assert
(
0
);
assert
(
0
);
}
}
STableGroupInfo
group
=
{.
numOfTables
=
1
,
.
pGroupList
=
taosArrayInit
(
1
,
POINTER_BYTES
)};
SArray
*
pa
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
STableKeyInfo
info
=
{.
pTable
=
NULL
,
.
lastKey
=
0
,
.
uid
=
uid
};
taosArrayPush
(
pa
,
&
info
);
taosArrayPush
(
group
.
pGroupList
,
&
pa
);
*
pTaskInfo
=
createExecTaskInfo
((
uint64_t
)
pPlan
->
id
.
queryId
);
*
pTaskInfo
=
createExecTaskInfo
((
uint64_t
)
pPlan
->
id
.
queryId
);
tsdbReadHandleT
tsdbReadHandle
=
tsdbQueryTables
(
readerHandle
,
&
cond
,
&
group
,
(
*
pTaskInfo
)
->
id
.
queryId
,
NULL
);
tsdbReadHandleT
tsdbReadHandle
=
tsdbQueryTables
(
readerHandle
,
&
cond
,
pGroupInfo
,
(
*
pTaskInfo
)
->
id
.
queryId
,
NULL
);
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
tsdbReadHandle
);
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
tsdbReadHandle
);
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
...
...
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
6d9ae1af
...
@@ -558,11 +558,15 @@ static bool timeWindowToJson(const void* obj, cJSON* json) {
...
@@ -558,11 +558,15 @@ static bool timeWindowToJson(const void* obj, cJSON* json) {
static
bool
timeWindowFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
static
bool
timeWindowFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
STimeWindow
*
win
=
(
STimeWindow
*
)
obj
;
STimeWindow
*
win
=
(
STimeWindow
*
)
obj
;
char
*
p
=
getString
(
json
,
jkTimeWindowStartKey
);
char
*
pStartKey
=
getString
(
json
,
jkTimeWindowStartKey
);
win
->
skey
=
strtoll
(
p
,
NULL
,
10
);
win
->
skey
=
strtoll
(
pStartKey
,
NULL
,
10
);
char
*
pEndKey
=
getString
(
json
,
jkTimeWindowEndKey
);
win
->
ekey
=
strtoll
(
pEndKey
,
NULL
,
10
);
tfree
(
pStartKey
);
tfree
(
pEndKey
);
p
=
getString
(
json
,
jkTimeWindowEndKey
);
win
->
ekey
=
strtoll
(
p
,
NULL
,
10
);
return
true
;
return
true
;
}
}
...
@@ -574,14 +578,19 @@ static const char* jkScanNodeTableRevCount = "Reverse";
...
@@ -574,14 +578,19 @@ static const char* jkScanNodeTableRevCount = "Reverse";
static
bool
scanNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
static
bool
scanNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SScanPhyNode
*
pNode
=
(
const
SScanPhyNode
*
)
obj
;
const
SScanPhyNode
*
pNode
=
(
const
SScanPhyNode
*
)
obj
;
bool
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableId
,
pNode
->
uid
);
char
uid
[
40
]
=
{
0
};
snprintf
(
uid
,
tListLen
(
uid
),
"%"
PRIu64
,
pNode
->
uid
);
bool
res
=
cJSON_AddStringToObject
(
json
,
jkScanNodeTableId
,
uid
);
if
(
res
)
{
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableType
,
pNode
->
tableType
);
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableType
,
pNode
->
tableType
);
}
}
if
(
res
)
{
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableOrder
,
pNode
->
order
);
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableOrder
,
pNode
->
order
);
}
}
if
(
res
)
{
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableCount
,
pNode
->
count
);
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableCount
,
pNode
->
count
);
}
}
...
@@ -589,12 +598,17 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
...
@@ -589,12 +598,17 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
if
(
res
)
{
if
(
res
)
{
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableRevCount
,
pNode
->
reverse
);
res
=
cJSON_AddNumberToObject
(
json
,
jkScanNodeTableRevCount
,
pNode
->
reverse
);
}
}
return
res
;
return
res
;
}
}
static
bool
scanNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
static
bool
scanNodeFromJson
(
const
cJSON
*
json
,
void
*
obj
)
{
SScanPhyNode
*
pNode
=
(
SScanPhyNode
*
)
obj
;
SScanPhyNode
*
pNode
=
(
SScanPhyNode
*
)
obj
;
pNode
->
uid
=
getNumber
(
json
,
jkScanNodeTableId
);
char
*
val
=
getString
(
json
,
jkScanNodeTableId
);
pNode
->
uid
=
strtoull
(
val
,
NULL
,
10
);
tfree
(
val
);
pNode
->
tableType
=
getNumber
(
json
,
jkScanNodeTableType
);
pNode
->
tableType
=
getNumber
(
json
,
jkScanNodeTableType
);
pNode
->
count
=
getNumber
(
json
,
jkScanNodeTableCount
);
pNode
->
count
=
getNumber
(
json
,
jkScanNodeTableCount
);
pNode
->
order
=
getNumber
(
json
,
jkScanNodeTableOrder
);
pNode
->
order
=
getNumber
(
json
,
jkScanNodeTableOrder
);
...
@@ -726,7 +740,7 @@ static bool nodeAddrToJson(const void* obj, cJSON* json) {
...
@@ -726,7 +740,7 @@ static bool nodeAddrToJson(const void* obj, cJSON* json) {
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrInUse
,
ep
->
inUse
);
res
=
cJSON_AddNumberToObject
(
json
,
jkNodeAddrInUse
,
ep
->
inUse
);
}
}
if
(
res
)
{
if
(
res
)
{
res
=
addRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrToJson
,
ep
->
epAddr
,
ep
->
numOfEps
,
sizeof
(
SEpAddr
)
);
res
=
addRawArray
(
json
,
jkNodeAddrEpAddrs
,
epAddrToJson
,
ep
->
epAddr
,
sizeof
(
SEpAddr
),
ep
->
numOfEps
);
}
}
return
res
;
return
res
;
}
}
...
...
source/libs/qworker/CMakeLists.txt
浏览文件 @
6d9ae1af
aux_source_directory
(
src QWORKER_SRC
)
aux_source_directory
(
src QWORKER_SRC
)
#add_library(qworker ${QWORKER_SRC})
#target_include_directories(
# qworker
# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
#)
#
#target_link_libraries(
# qworker
# PRIVATE os util transport planner qcom executor
#)
add_library
(
qworker STATIC
${
QWORKER_SRC
}
)
add_library
(
qworker STATIC
${
QWORKER_SRC
}
)
target_include_directories
(
target_include_directories
(
...
@@ -18,11 +7,6 @@ target_include_directories(
...
@@ -18,11 +7,6 @@ target_include_directories(
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
)
#set_target_properties(qworker PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libqworker.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/qworker"
# )
target_link_libraries
(
qworker
target_link_libraries
(
qworker
PRIVATE os util transport planner qcom executor
PRIVATE os util transport planner qcom executor
)
)
...
...
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
6d9ae1af
...
@@ -29,7 +29,7 @@ extern "C" {
...
@@ -29,7 +29,7 @@ extern "C" {
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCH_MAX_C
O
NDIDATE_EP_NUM TSDB_MAX_REPLICA
#define SCH_MAX_C
A
NDIDATE_EP_NUM TSDB_MAX_REPLICA
enum
{
enum
{
SCH_READ
=
1
,
SCH_READ
=
1
,
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
6d9ae1af
...
@@ -26,9 +26,9 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *
...
@@ -26,9 +26,9 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *
pTask
->
level
=
pLevel
;
pTask
->
level
=
pLevel
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
pTask
->
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
pTask
->
taskId
=
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
);
pTask
->
execAddrs
=
taosArrayInit
(
SCH_MAX_C
O
NDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
pTask
->
execAddrs
=
taosArrayInit
(
SCH_MAX_C
A
NDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pTask
->
execAddrs
)
{
if
(
NULL
==
pTask
->
execAddrs
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d exec addrs failed"
,
SCH_MAX_C
O
NDIDATE_EP_NUM
);
SCH_TASK_ELOG
(
"taosArrayInit %d exec addrs failed"
,
SCH_MAX_C
A
NDIDATE_EP_NUM
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -383,9 +383,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
...
@@ -383,9 +383,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
}
pTask
->
candidateIdx
=
0
;
pTask
->
candidateIdx
=
0
;
pTask
->
candidateAddrs
=
taosArrayInit
(
SCH_MAX_C
O
NDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
pTask
->
candidateAddrs
=
taosArrayInit
(
SCH_MAX_C
A
NDIDATE_EP_NUM
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pTask
->
candidateAddrs
)
{
if
(
NULL
==
pTask
->
candidateAddrs
)
{
SCH_TASK_ELOG
(
"taosArrayInit %d condidate addrs failed"
,
SCH_MAX_C
O
NDIDATE_EP_NUM
);
SCH_TASK_ELOG
(
"taosArrayInit %d condidate addrs failed"
,
SCH_MAX_C
A
NDIDATE_EP_NUM
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -405,10 +405,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
...
@@ -405,10 +405,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if
(
pJob
->
nodeList
)
{
if
(
pJob
->
nodeList
)
{
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
nodeNum
=
taosArrayGetSize
(
pJob
->
nodeList
);
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_C
O
NDIDATE_EP_NUM
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
nodeNum
&&
addNum
<
SCH_MAX_C
A
NDIDATE_EP_NUM
;
++
i
)
{
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
pJob
->
nodeList
,
i
);
SQueryNodeAddr
*
naddr
=
taosArrayGet
(
pJob
->
nodeList
,
i
);
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
&
pTask
->
plan
->
execNode
))
{
if
(
NULL
==
taosArrayPush
(
pTask
->
candidateAddrs
,
naddr
))
{
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d"
,
addNum
,
errno
);
SCH_TASK_ELOG
(
"taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d"
,
addNum
,
errno
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -418,12 +418,12 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
...
@@ -418,12 +418,12 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
}
if
(
addNum
<=
0
)
{
if
(
addNum
<=
0
)
{
SCH_TASK_ELOG
(
"no available execNode as c
o
ndidate addr, nodeNum:%d"
,
nodeNum
);
SCH_TASK_ELOG
(
"no available execNode as c
a
ndidate addr, nodeNum:%d"
,
nodeNum
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
/*
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_C
O
NDIDATE_EP_NUM; ++i) {
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_C
A
NDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
...
@@ -734,7 +734,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
...
@@ -734,7 +734,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
}
}
/*
/*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_C
O
NDIDATE_EP_NUM) {
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_C
A
NDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
...
@@ -1165,6 +1165,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
...
@@ -1165,6 +1165,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG
(
"subplanToString error, code:%x, msg:%p, len:%d"
,
code
,
pTask
->
msg
,
pTask
->
msgLen
);
SCH_TASK_ELOG
(
"subplanToString error, code:%x, msg:%p, len:%d"
,
code
,
pTask
->
msg
,
pTask
->
msgLen
);
SCH_ERR_JRET
(
code
);
SCH_ERR_JRET
(
code
);
}
}
printf
(
"physical plan:%s
\n
"
,
pTask
->
msg
);
}
}
SCH_ERR_JRET
(
schSetTaskCandidateAddrs
(
pJob
,
pTask
));
SCH_ERR_JRET
(
schSetTaskCandidateAddrs
(
pJob
,
pTask
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录