Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4043e5c6
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4043e5c6
编写于
6月 17, 2022
作者:
L
Liu Jicong
提交者:
GitHub
6月 17, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13954 from taosdata/feature/stream
fix(stream): build ctb name
上级
b0a2bcce
384b02d2
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
24 addition
and
22 deletion
+24
-22
examples/c/stream_demo.c
examples/c/stream_demo.c
+10
-3
include/client/taos.h
include/client/taos.h
+5
-16
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+3
-1
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+1
-0
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+2
-2
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+2
-0
未找到文件。
examples/c/stream_demo.c
浏览文件 @
4043e5c6
...
...
@@ -32,6 +32,13 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create database if not exists abc2 vgroups 20"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
...
...
@@ -81,9 +88,9 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger max_delay 10s into outstb as select _wstartts, sum(k) from st1 interval(10m)
"
);
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger at_once into abc2.outstb as select _wstartts, sum(k) from st1 "
"partition by tbname interval(10m)
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/client/taos.h
浏览文件 @
4043e5c6
...
...
@@ -209,15 +209,6 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
/* --------------------------TMQ INTERFACE------------------------------- */
#if 0
enum {
TMQ_RESP_ERR__FAIL = -1,
TMQ_RESP_ERR__SUCCESS = 0,
};
typedef int32_t tmq_resp_err_t;
#endif
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_conf_t
tmq_conf_t
;
typedef
struct
tmq_list_t
tmq_list_t
;
...
...
@@ -236,15 +227,13 @@ DLL_EXPORT const char *tmq_err2str(int32_t code);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT
int32_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
);
DLL_EXPORT
int32_t
tmq_unsubscribe
(
tmq_t
*
tmq
);
DLL_EXPORT
int32_t
tmq_subscription
(
tmq_t
*
tmq
,
tmq_list_t
**
topics
);
// timeout: -1 means infinitely waiting
DLL_EXPORT
int32_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
);
DLL_EXPORT
int32_t
tmq_unsubscribe
(
tmq_t
*
tmq
);
DLL_EXPORT
int32_t
tmq_subscription
(
tmq_t
*
tmq
,
tmq_list_t
**
topics
);
DLL_EXPORT
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
);
DLL_EXPORT
int32_t
tmq_consumer_close
(
tmq_t
*
tmq
);
DLL_EXPORT
int32_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
);
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
);
DLL_EXPORT
int32_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
);
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
TAOS_RES
*
msg
,
tmq_commit_cb
*
cb
,
void
*
param
);
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
...
...
source/common/src/tdatablock.c
浏览文件 @
4043e5c6
...
...
@@ -1713,6 +1713,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
}
char
*
buildCtbNameByGroupId
(
const
char
*
stbName
,
uint64_t
groupId
)
{
ASSERT
(
stbName
[
0
]
!=
0
);
SArray
*
tags
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
SSmlKv
*
pTag
=
taosMemoryCalloc
(
1
,
sizeof
(
SSmlKv
));
pTag
->
key
=
"group_id"
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
4043e5c6
...
...
@@ -105,7 +105,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
int32_t
size
=
encoder
.
pos
;
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
size
;
tEncoderClear
(
&
encoder
);
void
*
buf
=
taosMemory
Malloc
(
tlen
);
void
*
buf
=
taosMemory
Calloc
(
1
,
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -157,6 +157,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
}
sdbRelease
(
pMnode
->
pSdb
,
pDb
);
memcpy
(
pTask
->
shuffleDispatcher
.
stbFullName
,
pStream
->
targetSTbName
,
TSDB_TABLE_FNAME_LEN
);
SArray
*
pVgs
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
sz
=
taosArrayGetSize
(
pVgs
);
SArray
*
sinkLv
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
...
...
@@ -166,6 +167,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
for
(
int32_t
j
=
0
;
j
<
sinkLvSize
;
j
++
)
{
SStreamTask
*
pLastLevelTask
=
taosArrayGetP
(
sinkLv
,
j
);
if
(
pLastLevelTask
->
nodeId
==
pVgInfo
->
vgId
)
{
ASSERT
(
pVgInfo
->
vgId
>
0
);
pVgInfo
->
taskId
=
pLastLevelTask
->
taskId
;
ASSERT
(
pVgInfo
->
taskId
!=
0
);
break
;
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
4043e5c6
...
...
@@ -134,6 +134,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
int32_t
sz
=
taosArrayGetSize
(
vgInfo
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
vgInfo
,
i
);
ASSERT
(
pVgInfo
->
vgId
>
0
);
if
(
hashValue
>=
pVgInfo
->
hashBegin
&&
hashValue
<=
pVgInfo
->
hashEnd
)
{
vgId
=
pVgInfo
->
vgId
;
downstreamTaskId
=
pVgInfo
->
taskId
;
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
4043e5c6
...
...
@@ -70,7 +70,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tSerializeSUseDbRspImp
(
pEncoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
shuffleDispatcher
.
stbFullName
)
<
0
)
return
-
1
;
}
if
(
tEncodeI64
(
pEncoder
,
pTask
->
triggerParam
)
<
0
)
return
-
1
;
...
...
@@ -119,8 +119,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
if
(
tDeserializeSUseDbRspImp
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pTask
->
shuffleDispatcher
.
stbFullName
)
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
triggerParam
)
<
0
)
return
-
1
;
...
...
tests/system-test/fulltest.sh
浏览文件 @
4043e5c6
...
...
@@ -115,6 +115,8 @@ python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py
-f
7-tmq/subscribeDb.py
python3 ./test.py
-f
7-tmq/subscribeDb0.py
python3 ./test.py
-f
7-tmq/subscribeDb1.py
python3 ./test.py
-f
7-tmq/subscribeDb2.py
python3 ./test.py
-f
7-tmq/subscribeDb3.py
python3 ./test.py
-f
7-tmq/subscribeStb.py
python3 ./test.py
-f
7-tmq/subscribeStb0.py
python3 ./test.py
-f
7-tmq/subscribeStb1.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录