Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
451740cb
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
451740cb
编写于
1月 20, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/tfs
上级
f5d84cae
aa4a4d05
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
271 addition
and
183 deletion
+271
-183
include/common/tmsg.h
include/common/tmsg.h
+17
-2
include/libs/executor/executor.h
include/libs/executor/executor.h
+8
-1
include/libs/function/function.h
include/libs/function/function.h
+1
-0
include/libs/planner/plannerOp.h
include/libs/planner/plannerOp.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+60
-1
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+0
-50
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+5
-0
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+57
-57
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+12
-7
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+19
-14
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+1
-0
source/dnode/vnode/inc/tq.h
source/dnode/vnode/inc/tq.h
+2
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-7
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-5
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+33
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+5
-7
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+15
-8
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+14
-13
source/libs/planner/src/logicPlan.c
source/libs/planner/src/logicPlan.c
+8
-3
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+3
-1
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+1
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+3
-3
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+2
-2
未找到文件。
include/common/tmsg.h
浏览文件 @
451740cb
...
...
@@ -1525,9 +1525,23 @@ typedef struct SMqSetCVgReq {
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
S
Array
*
tasks
;
// SArray<SSubQueryMsg>
S
SubQueryMsg
msg
;
}
SMqSetCVgReq
;
static
FORCE_INLINE
int32_t
tEncodeSSubQueryMsg
(
void
**
buf
,
const
SSubQueryMsg
*
pMsg
)
{
int32_t
tlen
=
sizeof
(
SSubQueryMsg
)
+
pMsg
->
contentLen
;
if
(
buf
==
NULL
)
return
tlen
;
memcpy
(
*
buf
,
pMsg
,
tlen
);
*
buf
=
POINTER_SHIFT
(
*
buf
,
tlen
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSSubQueryMsg
(
void
*
buf
,
SSubQueryMsg
*
pMsg
)
{
int32_t
tlen
=
sizeof
(
SSubQueryMsg
)
+
((
SSubQueryMsg
*
)
buf
)
->
contentLen
;
memcpy
(
pMsg
,
buf
,
tlen
);
return
POINTER_SHIFT
(
buf
,
tlen
);
}
static
FORCE_INLINE
int32_t
tEncodeSMqSetCVgReq
(
void
**
buf
,
const
SMqSetCVgReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
...
...
@@ -1537,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen
+=
taosEncodeString
(
buf
,
pReq
->
sql
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
logicalPlan
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
physicalPlan
);
tlen
+=
tEncodeSSubQueryMsg
(
buf
,
&
pReq
->
msg
);
return
tlen
;
}
...
...
@@ -1548,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf
=
taosDecodeString
(
buf
,
&
pReq
->
sql
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
logicalPlan
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
physicalPlan
);
pReq
->
tasks
=
NULL
;
buf
=
tDecodeSSubQueryMsg
(
buf
,
&
pReq
->
msg
)
;
return
buf
;
}
...
...
include/libs/executor/executor.h
浏览文件 @
451740cb
...
...
@@ -26,6 +26,13 @@ typedef void* qTaskInfo_t;
typedef
void
*
DataSinkHandle
;
struct
SSubplan
;
/**
* Create the exec task for streaming mode
* @param pMsg
* @param pStreamBlockReadHandle
* @return
*/
qTaskInfo_t
createStreamExecTaskInfo
(
SSubQueryMsg
*
pMsg
,
void
*
pStreamBlockReadHandle
);
/**
* Create the exec task object according to task json
...
...
@@ -203,4 +210,4 @@ void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
}
#endif
#endif
/*_TD_EXECUTOR_H_*/
\ No newline at end of file
#endif
/*_TD_EXECUTOR_H_*/
include/libs/function/function.h
浏览文件 @
451740cb
...
...
@@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc {
bool
interpQuery
;
bool
distinct
;
bool
join
;
bool
continueQuery
;
}
SMultiFunctionsDesc
;
int32_t
getResultDataInfo
(
int32_t
dataType
,
int32_t
dataBytes
,
int32_t
functionId
,
int32_t
param
,
SResultDataInfo
*
pInfo
,
int16_t
extLength
,
...
...
include/libs/planner/plannerOp.h
浏览文件 @
451740cb
...
...
@@ -23,6 +23,7 @@
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif
OP_ENUM_MACRO
(
StreamScan
)
OP_ENUM_MACRO
(
TableScan
)
OP_ENUM_MACRO
(
DataBlocksOptScan
)
OP_ENUM_MACRO
(
TableSeqScan
)
...
...
include/libs/qcom/query.h
浏览文件 @
451740cb
...
...
@@ -109,12 +109,71 @@ typedef struct STableMetaOutput {
STableMeta
*
tbMeta
;
}
STableMetaOutput
;
const
SSchema
*
tGetTbnameColumnSchema
();
typedef
struct
SDataBuf
{
void
*
pData
;
uint32_t
len
;
}
SDataBuf
;
typedef
int32_t
(
*
__async_send_cb_fn_t
)(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
typedef
int32_t
(
*
__async_exec_fn_t
)(
void
*
param
);
typedef
struct
SMsgSendInfo
{
__async_send_cb_fn_t
fp
;
//async callback function
void
*
param
;
uint64_t
requestId
;
uint64_t
requestObjRefId
;
int32_t
msgType
;
SDataBuf
msgInfo
;
}
SMsgSendInfo
;
typedef
struct
SQueryNodeAddr
{
int32_t
nodeId
;
// vgId or qnodeId
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
}
SQueryNodeAddr
;
static
FORCE_INLINE
void
tConvertQueryAddrToEpSet
(
SEpSet
*
pEpSet
,
const
SQueryNodeAddr
*
pAddr
)
{
pEpSet
->
inUse
=
pAddr
->
inUse
;
pEpSet
->
numOfEps
=
pAddr
->
numOfEps
;
for
(
int
j
=
0
;
j
<
TSDB_MAX_REPLICA
;
j
++
)
{
pEpSet
->
port
[
j
]
=
pAddr
->
epAddr
[
j
].
port
;
memcpy
(
pEpSet
->
fqdn
[
j
],
pAddr
->
epAddr
[
j
].
fqdn
,
TSDB_FQDN_LEN
);
}
}
int32_t
initTaskQueue
();
int32_t
cleanupTaskQueue
();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t
taosAsyncExec
(
__async_exec_fn_t
execFn
,
void
*
execParam
,
int32_t
*
code
);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t
asyncSendMsgToServer
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
);
void
initQueryModuleMsgHandle
();
const
SSchema
*
tGetTbnameColumnSchema
();
bool
tIsValidSchema
(
struct
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfTags
);
extern
int32_t
(
*
queryBuildMsg
[
TDMT_MAX
])(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
);
extern
int32_t
(
*
queryProcessMsgRsp
[
TDMT_MAX
])(
void
*
output
,
char
*
msg
,
int32_t
msgSize
);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
...
...
include/libs/transport/trpc.h
浏览文件 @
451740cb
...
...
@@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
);
void
rpcCancelRequest
(
int64_t
rid
);
typedef
struct
SDataBuf
{
void
*
pData
;
uint32_t
len
;
}
SDataBuf
;
typedef
int32_t
(
*
__async_send_cb_fn_t
)(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
typedef
int32_t
(
*
__async_exec_fn_t
)(
void
*
param
);
typedef
struct
SMsgSendInfo
{
__async_send_cb_fn_t
fp
;
//async callback function
void
*
param
;
uint64_t
requestId
;
uint64_t
requestObjRefId
;
int32_t
msgType
;
SDataBuf
msgInfo
;
}
SMsgSendInfo
;
typedef
struct
SQueryNodeAddr
{
int32_t
nodeId
;
// vgId or qnodeId
int8_t
inUse
;
int8_t
numOfEps
;
SEpAddr
epAddr
[
TSDB_MAX_REPLICA
];
}
SQueryNodeAddr
;
bool
tIsValidSchema
(
struct
SSchema
*
pSchema
,
int32_t
numOfCols
,
int32_t
numOfTags
);
int32_t
initTaskQueue
();
int32_t
cleanupTaskQueue
();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t
taosAsyncExec
(
__async_exec_fn_t
execFn
,
void
*
execParam
,
int32_t
*
code
);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t
asyncSendMsgToServer
(
void
*
pTransporter
,
SEpSet
*
epSet
,
int64_t
*
pTransporterId
,
const
SMsgSendInfo
*
pInfo
);
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientImpl.c
浏览文件 @
451740cb
...
...
@@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
CHECK_CODE_GOTO
(
parseSql
(
pRequest
,
&
pQueryNode
),
_return
);
SQueryStmtInfo
*
pQueryStmtInfo
=
(
SQueryStmtInfo
*
)
pQueryNode
;
pQueryStmtInfo
->
info
.
continueQuery
=
true
;
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO
(
qCreateQueryDag
(
pQueryNode
,
&
pRequest
->
body
.
pDag
,
pRequest
->
requestId
),
_return
);
...
...
@@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
goto
_return
;
}
printf
(
"%s
\n
"
,
pStr
);
// The topic should be related to a database that the queried table is belonged to.
SName
name
=
{
0
};
char
dbName
[
TSDB_DB_FNAME_LEN
]
=
{
0
};
...
...
source/client/test/clientTests.cpp
浏览文件 @
451740cb
...
...
@@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) {
// taosHashCleanup(phash);
//}
//
//
TEST(testCase, create_topic_Test) {
//
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//
assert(pConn != NULL);
//
//
TAOS_RES* pRes = taos_query(pConn, "use abc1");
//
if (taos_errno(pRes) != 0) {
//
printf("error in use db, reason:%s\n", taos_errstr(pRes));
//
}
//
taos_free_result(pRes);
//
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//
ASSERT_TRUE(pFields == nullptr);
//
//
int32_t numOfFields = taos_num_fields(pRes);
//
ASSERT_EQ(numOfFields, 0);
//
//
taos_free_result(pRes);
//
//
char* sql = "select * from tu";
//
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
//
taos_free_result(pRes);
//
taos_close(pConn);
//
}
//
TEST
(
testCase
,
create_topic_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
nullptr
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
ASSERT_EQ
(
numOfFields
,
0
);
taos_free_result
(
pRes
);
char
*
sql
=
"select * from tu"
;
pRes
=
taos_create_topic
(
pConn
,
"test_topic_1"
,
sql
,
strlen
(
sql
));
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
...
...
@@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) {
// taos_close(pConn);
//}
TEST
(
testCase
,
agg_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use dbv"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table tx using st tags(111111111111111)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"select count(*) from tu"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to select from table, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
ASSERT_TRUE
(
false
);
}
TAOS_ROW
pRow
=
NULL
;
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
int32_t
numOfFields
=
taos_num_fields
(
pRes
);
char
str
[
512
]
=
{
0
};
while
((
pRow
=
taos_fetch_row
(
pRes
))
!=
NULL
)
{
int32_t
code
=
taos_print_row
(
str
,
pRow
,
pFields
,
numOfFields
);
printf
(
"%s
\n
"
,
str
);
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
\ No newline at end of file
//TEST(testCase, agg_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use dbv");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create table, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select count(*) from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
\ No newline at end of file
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
451740cb
...
...
@@ -327,25 +327,30 @@ typedef struct SMqTopicConsumer {
#endif
typedef
struct
SMqConsumerEp
{
int32_t
vgId
;
// -1 for unassigned
SEpSet
epset
;
int64_t
consumerId
;
// -1 for unassigned
int64_t
lastConsumerHbTs
;
int64_t
lastVgHbTs
;
int32_t
vgId
;
// -1 for unassigned
SEpSet
epSet
;
int64_t
consumerId
;
// -1 for unassigned
int64_t
lastConsumerHbTs
;
int64_t
lastVgHbTs
;
int32_t
execLen
;
SSubQueryMsg
qExec
;
}
SMqConsumerEp
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
vgId
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
ep
s
et
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
ep
S
et
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
tEncodeSSubQueryMsg
(
buf
,
&
pConsumerEp
->
qExec
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
vgId
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
ep
s
et
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
ep
S
et
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
tDecodeSSubQueryMsg
(
buf
,
&
pConsumerEp
->
qExec
);
pConsumerEp
->
execLen
=
sizeof
(
SSubQueryMsg
)
+
pConsumerEp
->
qExec
.
contentLen
;
return
buf
;
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
451740cb
...
...
@@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
strcpy
(
req
.
sql
,
pTopic
->
sql
);
strcpy
(
req
.
logicalPlan
,
pTopic
->
logicalPlan
);
strcpy
(
req
.
physicalPlan
,
pTopic
->
physicalPlan
);
memcpy
(
&
req
.
msg
,
&
pCEp
->
qExec
,
pCEp
->
execLen
);
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
reqStr
=
malloc
(
tlen
);
if
(
reqStr
==
NULL
)
{
...
...
@@ -116,7 +117,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// persist msg
STransAction
action
=
{
0
};
action
.
epSet
=
pCEp
->
ep
s
et
;
action
.
epSet
=
pCEp
->
ep
S
et
;
action
.
pCont
=
reqStr
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
...
...
@@ -141,20 +142,24 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
}
static
int
mndInitUnassignedVg
(
SMnode
*
pMnode
,
SMqTopicObj
*
pTopic
,
SArray
*
unassignedVg
)
{
SMqConsumerEp
CEp
;
CEp
.
lastConsumerHbTs
=
CEp
.
lastVgHbTs
=
-
1
;
int32_t
sz
;
SVgObj
*
pVgroup
=
NULL
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
NULL
,
(
void
**
)
&
pVgroup
);
while
(
pIter
!=
NULL
)
{
if
(
pVgroup
->
dbUid
==
pTopic
->
dbUid
)
{
CEp
.
epset
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
CEp
.
vgId
=
pVgroup
->
vgId
;
taosArrayPush
(
unassignedVg
,
&
CEp
);
}
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
//convert phyplan to dag
SQueryDag
*
pDag
=
qStringToDag
(
pTopic
->
physicalPlan
);
SArray
*
pArray
;
if
(
schedulerConvertDagToTaskList
(
pDag
,
&
pArray
)
<
0
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pArray
);
//convert dag to msg
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqConsumerEp
CEp
;
CEp
.
lastConsumerHbTs
=
CEp
.
lastVgHbTs
=
-
1
;
STaskInfo
*
pTaskInfo
=
taosArrayGet
(
pArray
,
i
);
tConvertQueryAddrToEpSet
(
&
CEp
.
epSet
,
&
pTaskInfo
->
addr
);
CEp
.
vgId
=
pTaskInfo
->
addr
.
nodeId
;
taosArrayPush
(
unassignedVg
,
&
CEp
);
}
qDestroyQueryDag
(
pDag
);
return
0
;
}
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
451740cb
...
...
@@ -26,6 +26,7 @@ target_link_libraries(
PUBLIC tfs
PUBLIC wal
PUBLIC scheduler
PUBLIC executor
PUBLIC qworker
)
...
...
source/dnode/vnode/inc/tq.h
浏览文件 @
451740cb
...
...
@@ -21,6 +21,7 @@
#include "meta.h"
#include "os.h"
#include "scheduler.h"
#include "executor.h"
#include "taoserror.h"
#include "tlist.h"
#include "tmsg.h"
...
...
@@ -165,7 +166,7 @@ typedef struct STqTaskItem {
int8_t
status
;
int64_t
offset
;
void
*
dst
;
SSubQueryMsg
*
pMsg
;
qTaskInfo_t
task
;
}
STqTaskItem
;
// new version
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
451740cb
...
...
@@ -634,7 +634,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
}
SSubmitMsg
*
pCont
=
(
SSubmitMsg
*
)
&
pHead
->
head
.
body
;
SSubQueryMsg
*
pQueryMsg
=
pHandle
->
buffer
.
output
[
pos
].
pMsg
;
/*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/
// TODO: launch query and get output data
void
*
outputData
;
...
...
@@ -681,7 +681,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
if
(
schedulerConvertDagToTaskList
(
pDag
,
&
pArray
)
<
0
)
{
// TODO: handle error
}
ASSERT
(
taosArrayGetSize
(
pArray
)
==
0
);
STaskInfo
*
pInfo
=
taosArrayGet
(
pArray
,
0
);
SArray
*
pTasks
;
schedulerCopyTask
(
pInfo
,
&
pTasks
,
TQ_BUFFER_SIZE
);
...
...
@@ -689,8 +688,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
pTopic
->
buffer
.
lastOffset
=
-
1
;
for
(
int
i
=
0
;
i
<
TQ_BUFFER_SIZE
;
i
++
)
{
SSubQueryMsg
*
pMsg
=
taosArrayGet
(
pTasks
,
i
);
pTopic
->
buffer
.
output
[
i
].
pMsg
=
pMsg
;
pTopic
->
buffer
.
output
[
i
].
status
=
0
;
pTopic
->
buffer
.
output
[
i
].
task
=
createStreamExecTaskInfo
(
pMsg
,
NULL
);
}
pTopic
->
pReadhandle
=
walOpenReadHandle
(
pTq
->
pWal
);
// write mq meta
...
...
@@ -733,6 +732,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
// TODO: filter out unused column
return
0
;
}
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
)
{
int32_t
sversion
=
pHandle
->
pBlock
->
sversion
;
SSchemaWrapper
*
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pMeta
,
pHandle
->
pBlock
->
uid
,
sversion
,
true
);
...
...
@@ -762,7 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush
(
pArray
,
&
colInfo
);
return
pArray
;
}
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
* status) {*/
/*return 0;*/
/*}*/
source/libs/executor/inc/executorimpl.h
浏览文件 @
451740cb
...
...
@@ -253,12 +253,8 @@ typedef struct SExecTaskInfo {
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
// tsem_t ready;
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char
*
sql
;
// query sql string
jmp_buf
env
;
//
DataSinkHandle
dsHandle
;
struct
SOperatorInfo
*
pRoot
;
}
SExecTaskInfo
;
...
...
@@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
);
#endif // TDENGINE_EXECUTORIMPL_H
source/libs/executor/src/executor.c
浏览文件 @
451740cb
...
...
@@ -11,4 +11,36 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "planner.h"
#include "executor.h"
qTaskInfo_t
createStreamExecTaskInfo
(
SSubQueryMsg
*
pMsg
,
void
*
pStreamBlockReadHandle
)
{
if
(
pMsg
==
NULL
||
pStreamBlockReadHandle
==
NULL
)
{
return
NULL
;
}
// print those info into log
pMsg
->
sId
=
be64toh
(
pMsg
->
sId
);
pMsg
->
queryId
=
be64toh
(
pMsg
->
queryId
);
pMsg
->
taskId
=
be64toh
(
pMsg
->
taskId
);
pMsg
->
contentLen
=
ntohl
(
pMsg
->
contentLen
);
struct
SSubplan
*
plan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
pMsg
->
msg
,
&
plan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
NULL
;
}
qTaskInfo_t
pTaskInfo
=
NULL
;
code
=
qCreateExecTask
(
pStreamBlockReadHandle
,
0
,
plan
,
&
pTaskInfo
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO: destroy SSubplan & pTaskInfo
terrno
=
code
;
return
NULL
;
}
return
pTaskInfo
;
}
source/libs/executor/src/executorMain.c
浏览文件 @
451740cb
...
...
@@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
tfree
(
param
->
prevResult
);
}
int32_t
qCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
assert
(
tsdb
!=
NULL
&&
pSubplan
!=
NULL
);
int32_t
qCreateExecTask
(
void
*
readHandle
,
int32_t
vgId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
)
{
assert
(
readHandle
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
doCreateExecTaskInfo
(
pSubplan
,
pTask
,
tsdb
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
goto
_error
;
}
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
&
(
*
pTask
)
->
dsH
andle
);
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
h
andle
);
*
handle
=
(
*
pTask
)
->
dsHandle
;
_error:
_error:
// if failed to add ref for all tables in this query, abort current query
return
code
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
451740cb
...
...
@@ -7778,22 +7778,29 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
return
NULL
;
}
int32_t
doCreateExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
)
{
tsdbReadHandleT
tReaderHandle
=
NULL
;
int32_t
code
=
0
;
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
void
*
readerHandle
)
{
uint64_t
queryId
=
pPlan
->
id
.
queryId
;
SPhyNode
*
pPhyNode
=
pPlan
->
pNode
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
*
pTaskInfo
=
createExecTaskInfo
(
queryId
);
if
(
*
pTaskInfo
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_complete
;
}
(
*
pTaskInfo
)
->
pRoot
=
doCreateOperatorTreeNode
(
pPlan
->
pNode
,
*
pTaskInfo
,
readerHandle
,
queryId
);
if
((
*
pTaskInfo
)
->
pRoot
==
NULL
)
{
return
terrno
;
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_complete
;
}
return
TSDB_CODE_SUCCESS
;
return
code
;
_complete:
tfree
(
*
pTaskInfo
);
terrno
=
code
;
return
code
;
}
/**
...
...
source/libs/planner/inc/plannerInt.h
浏览文件 @
451740cb
...
...
@@ -28,19 +28,20 @@ extern "C" {
#define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5
#define QNODE_LIMIT 6
#define QNODE_JOIN 7
#define QNODE_DISTINCT 8
#define QNODE_SORT 9
#define QNODE_UNION 10
#define QNODE_TIMEWINDOW 11
#define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13
#define QNODE_FILL 14
#define QNODE_MODIFY 15
#define QNODE_STREAMSCAN 3
#define QNODE_PROJECT 4
#define QNODE_AGGREGATE 5
#define QNODE_GROUPBY 6
#define QNODE_LIMIT 7
#define QNODE_JOIN 8
#define QNODE_DISTINCT 9
#define QNODE_SORT 10
#define QNODE_UNION 11
#define QNODE_TIMEWINDOW 12
#define QNODE_SESSIONWINDOW 13
#define QNODE_STATEWINDOW 14
#define QNODE_FILL 15
#define QNODE_MODIFY 16
typedef
struct
SQueryDistPlanNodeInfo
{
bool
stableQuery
;
// super table query or not
...
...
source/libs/planner/src/logicPlan.c
浏览文件 @
451740cb
...
...
@@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
switch
(
type
)
{
case
QNODE_TAGSCAN
:
case
QNODE_STREAMSCAN
:
case
QNODE_TABLESCAN
:
{
SQueryTableInfo
*
info
=
calloc
(
1
,
sizeof
(
SQueryTableInfo
));
memcpy
(
info
,
pExtInfo
,
sizeof
(
SQueryTableInfo
));
...
...
@@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
return
pNode
;
}
SQueryPlanNode
*
pNode
=
createQueryNode
(
QNODE_TABLESCAN
,
"TableScan"
,
NULL
,
0
,
NULL
,
0
,
info
);
SQueryPlanNode
*
pNode
=
NULL
;
if
(
pQueryInfo
->
info
.
continueQuery
)
{
pNode
=
createQueryNode
(
QNODE_STREAMSCAN
,
"StreamScan"
,
NULL
,
0
,
NULL
,
0
,
info
);
}
else
{
pNode
=
createQueryNode
(
QNODE_TABLESCAN
,
"TableScan"
,
NULL
,
0
,
NULL
,
0
,
info
);
}
if
(
!
pQueryInfo
->
info
.
projectionQuery
)
{
SArray
*
p
=
pQueryInfo
->
exprList
[
0
];
...
...
@@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
pNode
->
numOfExpr
=
num
;
pNode
->
pExpr
=
taosArrayInit
(
num
,
POINTER_BYTES
);
taosArrayAddAll
(
pNode
->
pExpr
,
p
);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
}
}
...
...
@@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
int32_t
len
=
len1
+
totalLen
;
switch
(
pQueryNode
->
info
.
type
)
{
case
QNODE_STREAMSCAN
:
case
QNODE_TABLESCAN
:
{
SQueryTableInfo
*
pInfo
=
(
SQueryTableInfo
*
)
pQueryNode
->
pExtInfo
;
len1
=
sprintf
(
buf
+
len
,
"%s #%"
PRIu64
,
pInfo
->
tableName
,
pInfo
->
uid
);
...
...
@@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
int32_t
queryPlanToString
(
struct
SQueryPlanNode
*
pQueryNode
,
char
**
str
)
{
assert
(
pQueryNode
);
*
str
=
calloc
(
1
,
4096
);
int32_t
len
=
sprintf
(
*
str
,
"===== logic plan =====
\n
"
);
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
451740cb
...
...
@@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
static
SPhyNode
*
createSingleTableScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
,
SSubplan
*
subplan
)
{
vgroupMsgToEpSet
(
&
(
pTable
->
pMeta
->
vgroupList
->
vgroups
[
0
]),
&
subplan
->
execNode
);
return
createUserTableScanNode
(
pPlanNode
,
pTable
,
OP_TableScan
);
int32_t
type
=
(
pPlanNode
->
info
.
type
==
QNODE_TABLESCAN
)
?
OP_TableScan
:
OP_StreamScan
;
return
createUserTableScanNode
(
pPlanNode
,
pTable
,
type
);
}
static
SPhyNode
*
createTableScanNode
(
SPlanContext
*
pCxt
,
SQueryPlanNode
*
pPlanNode
)
{
...
...
@@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case
QNODE_TAGSCAN
:
node
=
createTagScanNode
(
pPlanNode
);
break
;
case
QNODE_STREAMSCAN
:
case
QNODE_TABLESCAN
:
node
=
createTableScanNode
(
pCxt
,
pPlanNode
);
break
;
...
...
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
451740cb
...
...
@@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
static
bool
specificPhyNodeToJson
(
const
void
*
obj
,
cJSON
*
json
)
{
const
SPhyNode
*
phyNode
=
(
const
SPhyNode
*
)
obj
;
switch
(
phyNode
->
info
.
type
)
{
case
OP_StreamScan
:
case
OP_TableScan
:
case
OP_DataBlocksOptScan
:
case
OP_TableSeqScan
:
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
451740cb
...
...
@@ -921,17 +921,17 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
code
=
qStringToSubplan
(
qwMsg
->
msg
,
&
plan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
QW_TASK_ELOG
(
"task string to subplan failed, code:%
x"
,
code
);
QW_TASK_ELOG
(
"task string to subplan failed, code:%
s"
,
tstrerror
(
code
)
);
QW_ERR_JRET
(
code
);
}
code
=
qCreateExecTask
(
qwMsg
->
node
,
0
,
(
struct
SSubplan
*
)
plan
,
&
pTaskInfo
,
&
sinkHandle
);
if
(
code
)
{
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%
x"
,
code
);
QW_TASK_ELOG
(
"qCreateExecTask failed, code:%
s"
,
tstrerror
(
code
)
);
QW_ERR_JRET
(
code
);
}
if
(
(
pTaskInfo
&&
NULL
==
sinkHandle
)
||
(
NULL
==
pTaskInfo
&&
sinkHandle
)
)
{
if
(
NULL
==
sinkHandle
||
NULL
==
pTaskInfo
)
{
QW_TASK_ELOG
(
"create task result error, taskHandle:%p, sinkHandle:%p"
,
pTaskInfo
,
sinkHandle
);
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
451740cb
...
...
@@ -666,12 +666,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
int32_t
taskDone
=
0
;
int32_t
code
=
0
;
SCH_TASK_DLOG
(
"taskOnFailure, code:%
x"
,
errCode
);
SCH_TASK_DLOG
(
"taskOnFailure, code:%
s"
,
tstrerror
(
errCode
)
);
SCH_ERR_JRET
(
schTaskCheckAndSetRetry
(
pJob
,
pTask
,
errCode
,
&
needRetry
));
if
(
!
needRetry
)
{
SCH_TASK_ELOG
(
"task failed and no more retry, code:%
x"
,
errCode
);
SCH_TASK_ELOG
(
"task failed and no more retry, code:%
s"
,
tstrerror
(
errCode
)
);
if
(
SCH_GET_TASK_STATUS
(
pTask
)
==
JOB_TASK_STATUS_EXECUTING
)
{
code
=
schMoveTaskToFailList
(
pJob
,
pTask
,
&
moved
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录