Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d3104e78
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d3104e78
编写于
3月 22, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
stream task exec
上级
a6044dc2
变更
16
显示空白变更内容
内联
并排
Showing
16 changed file
with
531 addition
and
81 deletion
+531
-81
example/CMakeLists.txt
example/CMakeLists.txt
+24
-6
example/src/tstream.c
example/src/tstream.c
+224
-0
include/client/taos.h
include/client/taos.h
+6
-2
include/common/tcommon.h
include/common/tcommon.h
+3
-0
include/common/tmsg.h
include/common/tmsg.h
+8
-6
include/libs/executor/executor.h
include/libs/executor/executor.h
+26
-24
source/client/src/tmq.c
source/client/src/tmq.c
+2
-2
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+23
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-4
source/dnode/mgmt/vnode/src/vmMsg.c
source/dnode/mgmt/vnode/src/vmMsg.c
+1
-0
source/dnode/mnode/impl/inc/mndSnode.h
source/dnode/mnode/impl/inc/mndSnode.h
+1
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+32
-30
source/dnode/mnode/impl/src/mndSnode.c
source/dnode/mnode/impl/src/mndSnode.c
+9
-0
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+2
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+163
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+3
-4
未找到文件。
example/CMakeLists.txt
浏览文件 @
d3104e78
aux_source_directory
(
src TMQ_DEMO_SRC
)
add_executable
(
tmq
""
)
add_executable
(
tstream
""
)
add_executable
(
tmq
${
TMQ_DEMO_SRC
}
)
target_link_libraries
(
tmq taos
target_sources
(
tmq
PRIVATE
"src/tmq.c"
)
target_include_directories
(
tmq
target_sources
(
tstream
PRIVATE
"src/tstream.c"
)
target_link_libraries
(
tmq
taos
)
target_link_libraries
(
tstream
taos
)
target_include_directories
(
tmq
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
target_include_directories
(
tstream
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/inc"
)
SET_TARGET_PROPERTIES
(
tmq PROPERTIES OUTPUT_NAME tmq
)
SET_TARGET_PROPERTIES
(
tstream PROPERTIES OUTPUT_NAME tstream
)
example/src/tstream.c
0 → 100644
浏览文件 @
d3104e78
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static
int
running
=
1
;
static
void
msg_process
(
tmq_message_t
*
message
)
{
tmqShowMsg
(
message
);
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, k int) tags(a int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists tu1 using st1 tags(1)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create table if not exists tu2 using st1 tags(2)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create child table tu2, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
return
0
;
}
int32_t
create_stream
()
{
printf
(
"create topic
\n
"
);
TAOS_RES
*
pRes
;
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
if
(
pConn
==
NULL
)
{
return
-
1
;
}
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
const
char
*
sql
=
"select ts,k from tu1"
;
pRes
=
tmq_create_stream
(
pConn
,
"stream1"
,
"out1"
,
sql
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream out1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
taos_close
(
pConn
);
return
0
;
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
tmq_resp_err_t
resp
,
tmq_topic_vgroup_list_t
*
offsets
,
void
*
param
)
{
printf
(
"commit %d
\n
"
,
resp
);
}
tmq_t
*
build_consumer
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
tmq_conf_t
*
conf
=
tmq_conf_new
();
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set_offset_commit_cb
(
conf
,
tmq_commit_cb_print
);
tmq_t
*
tmq
=
tmq_consumer_new
(
pConn
,
conf
,
NULL
,
0
);
return
tmq
;
}
tmq_list_t
*
build_topic_list
()
{
tmq_list_t
*
topic_list
=
tmq_list_new
();
tmq_list_append
(
topic_list
,
"test_stb_topic_1"
);
return
topic_list
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"subscribe err
\n
"
);
return
;
}
/*int32_t cnt = 0;*/
/*clock_t startTime = clock();*/
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
500
);
if
(
tmqmessage
)
{
/*cnt++;*/
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
sync_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
static
const
int
MIN_COMMIT_COUNT
=
1
;
int
msg_count
=
0
;
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
return
;
}
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
if
((
++
msg_count
%
MIN_COMMIT_COUNT
)
==
0
)
tmq_commit
(
tmq
,
NULL
,
0
);
}
}
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
perf_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
batchCnt
=
0
;
int32_t
skipLogNum
=
0
;
clock_t
startTime
=
clock
();
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
500
);
if
(
tmqmessage
)
{
batchCnt
++
;
skipLogNum
+=
tmqGetSkipLogNum
(
tmqmessage
);
/*msg_process(tmqmessage);*/
tmq_message_destroy
(
tmqmessage
);
}
else
{
break
;
}
}
clock_t
endTime
=
clock
();
printf
(
"log batch cnt: %d, skip log cnt: %d, time used:%f s
\n
"
,
batchCnt
,
skipLogNum
,
(
double
)(
endTime
-
startTime
)
/
CLOCKS_PER_SEC
);
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
int
code
;
if
(
argc
>
1
)
{
printf
(
"env init
\n
"
);
code
=
init_env
();
}
create_topic
();
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
/*perf_loop(tmq, topic_list);*/
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop
(
tmq
,
topic_list
);
}
include/client/taos.h
浏览文件 @
d3104e78
...
...
@@ -214,7 +214,6 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v
DLL_EXPORT
tmq_list_t
*
tmq_list_new
();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
);
DLL_EXPORT
const
char
*
tmq_err2str
(
tmq_resp_err_t
);
...
...
@@ -258,7 +257,12 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
);
/* ---------------------- OTHER ---------------------------- */
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
DLL_EXPORT
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
DLL_EXPORT
TAOS_RES
*
tmq_create_stream
(
TAOS
*
taos
,
const
char
*
streamName
,
const
char
*
tbName
,
const
char
*
sql
);
/* -------------------------------- OTHER -------------------------------- */
typedef
void
(
*
TAOS_SUBSCRIBE_CALLBACK
)(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
);
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
...
...
include/common/tcommon.h
浏览文件 @
d3104e78
...
...
@@ -101,6 +101,9 @@ void* blockDataDestroy(SSDataBlock* pBlock);
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
);
void
*
tDecodeDataBlock
(
const
void
*
buf
,
SSDataBlock
*
pBlock
);
int32_t
tEncodeDataBlocks
(
void
**
buf
,
const
SArray
*
blocks
);
void
*
tDecodeDataBlocks
(
const
void
*
buf
,
SArray
*
blocks
);
static
FORCE_INLINE
void
blockDestroyInner
(
SSDataBlock
*
pBlock
)
{
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
...
...
include/common/tmsg.h
浏览文件 @
d3104e78
...
...
@@ -2271,20 +2271,22 @@ enum {
typedef
struct
{
void
*
inputHandle
;
void
*
executor
[
4
]
;
}
SStream
TaskPar
Runner
;
void
*
executor
;
}
SStreamRunner
;
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
level
;
int8_t
status
;
int8_t
pipeEnd
;
int8_t
parallel
;
int8_t
pipeSource
;
int8_t
pipeSink
;
int8_t
numOfRunners
;
int8_t
parallelizable
;
SEpSet
NextOpEp
;
char
*
qmsg
;
// not applied to encoder and decoder
SStream
TaskParRunner
runner
;
SStream
Runner
runner
[
8
]
;
// void* executor;
// void* stateStore;
// storage handle
...
...
@@ -2316,7 +2318,7 @@ typedef struct {
typedef
struct
{
SStreamExecMsgHead
head
;
// TODO: other info needed by task
SArray
*
data
;
// SArray<SSDataBlock>
}
SStreamTaskExecReq
;
typedef
struct
{
...
...
include/libs/executor/executor.h
浏览文件 @
d3104e78
...
...
@@ -33,15 +33,15 @@ typedef struct SReadHandle {
}
SReadHandle
;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDAT
_BLOCK
0x2
#define STREAM_DATA_TYPE_SSDAT
A_BLOCK
0x2
/**
/**
* Create the exec task for streaming mode
* @param pMsg
* @param streamReadHandle
* @return
*/
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
void
*
streamReadHandle
);
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
void
*
streamReadHandle
);
/**
* Set the input data block for the stream scan.
...
...
@@ -62,7 +62,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
*/
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
SArray
*
tableIdList
,
bool
isAdd
);
/**
/**
* Create the exec task object according to task json
* @param readHandle
* @param vgId
...
...
@@ -71,7 +71,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
* @param qId
* @return
*/
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
);
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
);
/**
* The main task execution function, including query on both table and multiple tables,
...
...
@@ -81,7 +82,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @param handle
* @return
*/
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
,
uint64_t
*
useconds
);
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
,
uint64_t
*
useconds
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
...
...
@@ -144,7 +145,8 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
* @param numOfIndex
* @return
*/
//int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex);
// int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex*
// groupByIndex, int32_t numOfIndex);
/**
* Update the table id list of a given query.
...
...
@@ -173,7 +175,7 @@ void qTaskMgmtNotifyClosing(void* pExecutor);
* Re-open the query handle management module when opening the vnode again.
* @param pExecutor
*/
void
qQueryMgmtReOpen
(
void
*
pExecutor
);
void
qQueryMgmtReOpen
(
void
*
pExecutor
);
/**
* Close query mgmt and clean up resources.
...
...
@@ -188,7 +190,7 @@ void qCleanupTaskMgmt(void* pExecutor);
* @param qInfo
* @return
*/
void
**
qRegisterTask
(
void
*
pMgmt
,
uint64_t
qId
,
void
*
qInfo
);
void
**
qRegisterTask
(
void
*
pMgmt
,
uint64_t
qId
,
void
*
qInfo
);
/**
* acquire the query handle according to the key from query mgmt object.
...
...
source/client/src/tmq.c
浏览文件 @
d3104e78
...
...
@@ -471,8 +471,8 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
}
sqlLen
=
strlen
(
sql
);
if
(
strlen
(
stream
Name
)
>=
TSDB_TABLE_NAME_LEN
)
{
tscError
(
"
stream
name too long, max length:%d"
,
TSDB_TABLE_NAME_LEN
-
1
);
if
(
strlen
(
tb
Name
)
>=
TSDB_TABLE_NAME_LEN
)
{
tscError
(
"
output tb
name too long, max length:%d"
,
TSDB_TABLE_NAME_LEN
-
1
);
terrno
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
_return
;
}
...
...
source/common/src/tdatablock.c
浏览文件 @
d3104e78
...
...
@@ -1250,3 +1250,26 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
}
return
(
void
*
)
buf
;
}
int32_t
tEncodeDataBlocks
(
void
**
buf
,
const
SArray
*
blocks
)
{
int32_t
tlen
=
0
;
int32_t
sz
=
taosArrayGetSize
(
blocks
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
blocks
,
i
);
tlen
+=
tEncodeDataBlock
(
buf
,
pBlock
);
}
return
tlen
;
}
void
*
tDecodeDataBlocks
(
const
void
*
buf
,
SArray
*
blocks
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
pBlock
=
{
0
};
buf
=
tDecodeDataBlock
(
buf
,
&
pBlock
);
}
return
(
void
*
)
buf
;
}
source/common/src/tmsg.c
浏览文件 @
d3104e78
...
...
@@ -2720,8 +2720,8 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
pipe
End
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
parallel
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
pipe
Sink
)
<
0
)
return
-
1
;
// if (tEncodeI8(pEncoder, pTask->numOfRunners
) < 0) return -1;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
...
...
@@ -2734,8 +2734,8 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
pipe
End
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
parallel
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
pipe
Sink
)
<
0
)
return
-
1
;
// if (tDecodeI8(pDecoder, &pTask->numOfRunners
) < 0) return -1;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
...
...
source/dnode/mgmt/vnode/src/vmMsg.c
浏览文件 @
d3104e78
...
...
@@ -273,6 +273,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_REB
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CUR
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_HEARTBEAT
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
0
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_EXEC
,
(
NodeMsgFp
)
vmProcessFetchMsg
,
0
);
...
...
source/dnode/mnode/impl/inc/mndSnode.h
浏览文件 @
d3104e78
...
...
@@ -24,6 +24,7 @@ extern "C" {
int32_t
mndInitSnode
(
SMnode
*
pMnode
);
void
mndCleanupSnode
(
SMnode
*
pMnode
);
SEpSet
mndAcquireEpFromSnode
(
SMnode
*
pMnode
,
const
SSnodeObj
*
pSnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
d3104e78
...
...
@@ -20,6 +20,7 @@
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndSubscribe.h"
...
...
@@ -31,7 +32,7 @@
#include "tname.h"
#include "tuuid.h"
int32_t
mndPersistTaskDeployReq
(
STrans
*
pTrans
,
SStreamTask
*
pTask
,
const
SEpSet
*
pEpSet
)
{
int32_t
mndPersistTaskDeployReq
(
STrans
*
pTrans
,
SStreamTask
*
pTask
,
const
SEpSet
*
pEpSet
,
tmsg_t
type
)
{
SCoder
encoder
;
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
NULL
,
0
,
TD_ENCODER
);
tEncodeSStreamTask
(
&
encoder
,
pTask
);
...
...
@@ -52,7 +53,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
memcpy
(
&
action
.
epSet
,
pEpSet
,
sizeof
(
SEpSet
));
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_SND_TASK_DEPLOY
;
action
.
msgType
=
type
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
rpcFreeCont
(
buf
);
return
-
1
;
...
...
@@ -69,12 +70,27 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
);
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
,
TDMT_VND_TASK_DEPLOY
);
return
0
;
}
SSnodeObj
*
mndSchedFetchSnode
(
SMnode
*
pMnode
)
{
SSnodeObj
*
pObj
=
NULL
;
pObj
=
sdbFetch
(
pMnode
->
pSdb
,
SDB_SNODE
,
NULL
,
(
void
**
)
&
pObj
);
return
pObj
;
}
int32_t
mndAssignTaskToSnode
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamTask
*
pTask
,
SSubplan
*
plan
,
const
SSnodeObj
*
pSnode
)
{
int32_t
msgLen
;
plan
->
execNode
.
nodeId
=
pSnode
->
id
;
plan
->
execNode
.
epSet
=
mndAcquireEpFromSnode
(
pMnode
,
pSnode
);
if
(
qSubPlanToString
(
plan
,
&
pTask
->
qmsg
,
&
msgLen
)
<
0
)
{
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
mndPersistTaskDeployReq
(
pTrans
,
pTask
,
&
plan
->
execNode
.
epSet
,
TDMT_SND_TASK_DEPLOY
);
return
0
;
}
...
...
@@ -113,8 +129,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// send to vnode
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
pTask
->
pipeSink
=
level
==
totLevel
-
1
?
1
:
0
;
// TODO: set to
pTask
->
parallel
=
4
;
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
...
...
@@ -122,34 +138,20 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
}
taosArrayPush
(
taskOneLevel
,
pTask
);
}
}
else
if
(
plan
->
subplanType
==
SUBPLAN_TYPE_SCAN
)
{
// duplicatable
int32_t
parallel
=
0
;
// if no snode, parallel set to fetch thread num in vnode
// if has snode, set to shared thread num in snode
parallel
=
SND_SHARED_THREAD_NUM
;
}
else
{
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
pTask
->
parallel
=
parallel
;
// TODO:get snode id and ep
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
pTask
->
pipeSink
=
level
==
totLevel
-
1
?
1
:
0
;
SSnodeObj
*
pSnode
=
mndSchedFetchSnode
(
pMnode
);
if
(
pSnode
!=
NULL
)
{
if
(
mndAssignTaskToSnode
(
pMnode
,
pTrans
,
pTask
,
plan
,
pSnode
)
<
0
)
{
sdbRelease
(
pSdb
,
pSnode
);
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
taosArrayPush
(
taskOneLevel
,
pTask
);
sdbRelease
(
pMnode
->
pSdb
,
pSnode
);
}
else
{
// not duplicatable
SStreamTask
*
pTask
=
streamTaskNew
(
pStream
->
uid
,
level
);
// TODO: get snode
if
(
mndAssignTaskToVg
(
pMnode
,
pTrans
,
pTask
,
plan
,
pVgroup
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
// TODO: assign to one vg
ASSERT
(
0
);
}
taosArrayPush
(
taskOneLevel
,
pTask
);
}
...
...
source/dnode/mnode/impl/src/mndSnode.c
浏览文件 @
d3104e78
...
...
@@ -60,6 +60,15 @@ int32_t mndInitSnode(SMnode *pMnode) {
void
mndCleanupSnode
(
SMnode
*
pMnode
)
{}
SEpSet
mndAcquireEpFromSnode
(
SMnode
*
pMnode
,
const
SSnodeObj
*
pSnode
)
{
SEpSet
epSet
;
memcpy
(
epSet
.
eps
->
fqdn
,
pSnode
->
pDnode
->
fqdn
,
128
);
epSet
.
eps
->
port
=
pSnode
->
pDnode
->
port
;
epSet
.
numOfEps
=
1
;
epSet
.
inUse
=
0
;
return
epSet
;
}
static
SSnodeObj
*
mndAcquireSnode
(
SMnode
*
pMnode
,
int32_t
snodeId
)
{
SSnodeObj
*
pObj
=
sdbAcquire
(
pMnode
->
pSdb
,
SDB_SNODE
,
&
snodeId
);
if
(
pObj
==
NULL
&&
terrno
==
TSDB_CODE_SDB_OBJ_NOT_THERE
)
{
...
...
source/dnode/snode/src/snode.c
浏览文件 @
d3104e78
...
...
@@ -57,8 +57,8 @@ void sndMetaDelete(SStreamMeta *pMeta) {
}
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
for
(
int
i
=
0
;
i
<
pTask
->
parallel
;
i
++
)
{
pTask
->
runner
.
executor
[
i
]
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
NULL
);
for
(
int
i
=
0
;
i
<
pTask
->
numOfRunners
;
i
++
)
{
pTask
->
runner
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
NULL
);
}
return
taosHashPut
(
pMeta
->
pHash
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
void
*
));
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d3104e78
...
...
@@ -70,6 +70,46 @@ void tqClose(STQ* pTq) {
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
tmsg_t
msgType
,
int64_t
version
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
!
pTask
->
pipeSource
)
continue
;
int32_t
workerId
=
0
;
void
*
exec
=
pTask
->
runner
[
workerId
].
executor
;
qSetStreamInput
(
exec
,
msg
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
output
);
}
if
(
pTask
->
pipeSink
)
{
// write back
}
else
{
int32_t
tlen
=
sizeof
(
SStreamExecMsgHead
)
+
tEncodeDataBlocks
(
NULL
,
pRes
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SStreamExecMsgHead
));
tEncodeDataBlocks
(
abuf
,
pRes
);
// serialize
// to next level
}
}
#if 0
void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
while (pIter != NULL) {
STqPusher* pusher = *(STqPusher**)pIter;
...
...
@@ -97,6 +137,7 @@ int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
// if handle waiting, launch query and response to consumer
//
// if no waiting handle, return
#endif
return
0
;
}
...
...
@@ -420,6 +461,21 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
return
0
;
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int32_t
parallel
)
{
ASSERT
(
parallel
<=
8
);
pTask
->
numOfRunners
=
parallel
;
for
(
int32_t
i
=
0
;
i
<
parallel
;
i
++
)
{
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pTq
->
pVnodeMeta
);
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pVnodeMeta
,
};
pTask
->
runner
[
i
].
inputHandle
=
pReadHandle
;
pTask
->
runner
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
&
handle
);
}
return
0
;
}
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTask
*
pTask
=
malloc
(
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
...
...
@@ -430,12 +486,118 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
tDecodeSStreamTask
(
&
decoder
,
pTask
);
tCoderClear
(
&
decoder
);
tqExpandTask
(
pTq
,
pTask
,
8
);
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
return
0
;
}
static
char
*
formatTimestamp
(
char
*
buf
,
int64_t
val
,
int
precision
)
{
time_t
tt
;
int32_t
ms
=
0
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
tt
=
(
time_t
)(
val
/
1000000000
);
ms
=
val
%
1000000000
;
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
tt
=
(
time_t
)(
val
/
1000000
);
ms
=
val
%
1000000
;
}
else
{
tt
=
(
time_t
)(
val
/
1000
);
ms
=
val
%
1000
;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if
(
tt
<
0
)
tt
=
0
;
#endif
if
(
tt
<=
0
&&
ms
<
0
)
{
tt
--
;
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
ms
+=
1000000000
;
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
ms
+=
1000000
;
}
else
{
ms
+=
1000
;
}
}
struct
tm
*
ptm
=
localtime
(
&
tt
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
sprintf
(
buf
+
pos
,
".%09d"
,
ms
);
}
else
if
(
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
sprintf
(
buf
+
pos
,
".%06d"
,
ms
);
}
else
{
sprintf
(
buf
+
pos
,
".%03d"
,
ms
);
}
return
buf
;
}
void
tqDebugShowSSData
(
SArray
*
dataBlocks
)
{
char
pBuf
[
128
];
int32_t
sz
=
taosArrayGetSize
(
dataBlocks
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
dataBlocks
,
i
);
int32_t
colNum
=
pDataBlock
->
info
.
numOfCols
;
int32_t
rows
=
pDataBlock
->
info
.
rows
;
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
printf
(
"|"
);
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
formatTimestamp
(
pBuf
,
*
(
uint64_t
*
)
var
,
TSDB_TIME_PRECISION_MILLI
);
printf
(
" %25s |"
,
pBuf
);
break
;
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
printf
(
" %15u |"
,
*
(
uint32_t
*
)
var
);
break
;
}
}
printf
(
"
\n
"
);
}
}
}
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
SRpcMsg
*
msg
)
{
//
SStreamTaskExecReq
*
pReq
=
msg
->
pCont
;
int32_t
taskId
=
pReq
->
head
.
streamTaskId
;
int32_t
workerType
=
pReq
->
head
.
workerType
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
// assume worker id is 1
int32_t
workerId
=
1
;
void
*
exec
=
pTask
->
runner
[
workerId
].
executor
;
int32_t
sz
=
taosArrayGetSize
(
pReq
->
data
);
printf
(
"input data:
\n
"
);
tqDebugShowSSData
(
pReq
->
data
);
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
input
=
taosArrayGet
(
pReq
->
data
,
i
);
SSDataBlock
*
output
;
uint64_t
ts
;
qSetStreamInput
(
exec
,
input
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
&
output
);
}
printf
(
"output data:
\n
"
);
tqDebugShowSSData
(
pRes
);
return
0
;
}
source/libs/executor/src/executor.c
浏览文件 @
d3104e78
...
...
@@ -52,9 +52,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
SSDataBlock
*
pDataBlock
=
input
;
pInfo
->
pRes
->
info
=
pDataBlock
->
info
;
for
(
int32_t
i
=
0
;
i
<
pInfo
->
pRes
->
info
.
numOfCols
;
++
i
)
{
pInfo
->
pRes
->
pDataBlock
=
pDataBlock
->
pDataBlock
;
}
taosArrayClear
(
pInfo
->
pRes
->
pDataBlock
);
taosArrayAddAll
(
pInfo
->
pRes
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
// set current block valid.
pInfo
->
blockValid
=
true
;
...
...
@@ -121,7 +120,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
// traverse to the streamscan node to add this table id
SOperatorInfo
*
pInfo
=
pTaskInfo
->
pRoot
;
while
(
pInfo
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
while
(
pInfo
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
pInfo
=
pInfo
->
pDownstream
[
0
];
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录