Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8eb08fa0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8eb08fa0
编写于
6月 15, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(stream): trigger mode add max delay
上级
ca339748
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
261 addition
and
48 deletion
+261
-48
examples/c/stream_demo.c
examples/c/stream_demo.c
+2
-1
include/libs/executor/executor.h
include/libs/executor/executor.h
+6
-3
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+31
-1
source/client/src/tmq.c
source/client/src/tmq.c
+84
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+2
-2
source/dnode/mnode/impl/inc/mndStream.h
source/dnode/mnode/impl/inc/mndStream.h
+1
-1
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+4
-4
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+3
-0
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+3
-2
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+8
-9
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+13
-8
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+13
-7
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+2
-2
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+7
-0
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+67
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+8
-6
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+2
-0
source/util/src/tarray.c
source/util/src/tarray.c
+5
-1
未找到文件。
examples/c/stream_demo.c
浏览文件 @
8eb08fa0
...
...
@@ -82,7 +82,8 @@ int32_t create_stream() {
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)"
);
pConn
,
"create stream stream1 trigger max_delay 10s into outstb as select _wstartts, sum(k) from st1 interval(10m)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/libs/executor/executor.h
浏览文件 @
8eb08fa0
...
...
@@ -38,8 +38,10 @@ typedef struct SReadHandle {
SMsgCb
*
pMsgCb
;
}
SReadHandle
;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
enum
{
STREAM_DATA_TYPE_SUBMIT_BLOCK
=
1
,
STREAM_DATA_TYPE_SSDATA_BLOCK
=
2
,
};
typedef
enum
{
OPTR_EXEC_MODEL_BATCH
=
0x1
,
...
...
@@ -102,7 +104,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @param tversion
* @return
*/
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
);
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
);
/**
* The main task execution function, including query on both table and multiple tables,
...
...
include/libs/stream/tstream.h
浏览文件 @
8eb08fa0
...
...
@@ -58,6 +58,7 @@ enum {
enum
{
STREAM_INPUT__DATA_SUBMIT
=
1
,
STREAM_INPUT__DATA_BLOCK
,
STREAM_INPUT__TRIGGER
,
STREAM_INPUT__CHECKPOINT
,
};
...
...
@@ -85,6 +86,11 @@ typedef struct {
int8_t
type
;
}
SStreamCheckpoint
;
typedef
struct
{
int8_t
type
;
SSDataBlock
*
pBlock
;
}
SStreamTrigger
;
enum
{
STREAM_QUEUE__SUCESS
=
1
,
STREAM_QUEUE__FAILED
,
...
...
@@ -98,6 +104,9 @@ typedef struct {
int8_t
status
;
}
SStreamQueue
;
int32_t
streamInit
();
void
streamCleanUp
();
SStreamQueue
*
streamQueueOpen
();
void
streamQueueClose
(
SStreamQueue
*
queue
);
...
...
@@ -220,6 +229,11 @@ enum {
TASK_INPUT_TYPE__DATA_BLOCK
,
};
enum
{
TASK_TRIGGER_STATUS__IN_ACTIVE
=
1
,
TASK_TRIGGER_STATUS__ACTIVE
,
};
struct
SStreamTask
{
int64_t
streamId
;
int32_t
taskId
;
...
...
@@ -262,8 +276,16 @@ struct SStreamTask {
SStreamQueue
*
inputQueue
;
SStreamQueue
*
outputQueue
;
// trigger
int8_t
triggerStatus
;
int64_t
triggerParam
;
void
*
timer
;
// application storage
// void* ahandle;
// msg handle
SMsgCb
*
pMsgCb
;
};
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
);
...
...
@@ -292,6 +314,13 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__CHECKPOINT
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__TRIGGER
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
if
(
pItem
->
type
!=
STREAM_INPUT__TRIGGER
&&
pItem
->
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
&&
pTask
->
triggerStatus
==
TASK_TRIGGER_STATUS__IN_ACTIVE
)
{
atomic_store_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__ACTIVE
);
}
// TODO: back pressure
...
...
@@ -370,7 +399,8 @@ typedef struct {
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
);
int32_t
streamTriggerByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
);
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
);
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
);
int32_t
streamTaskRun
(
SStreamTask
*
pTask
);
...
...
source/client/src/tmq.c
浏览文件 @
8eb08fa0
...
...
@@ -992,6 +992,90 @@ CREATE_MSG_FAIL:
return
-
1
;
}
bool
tmqUpdateEp2
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqAskEpRsp
*
pRsp
)
{
bool
set
=
false
;
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
tscDebug
(
"consumer %ld update ep epoch %d to epoch %d, topic num: %d"
,
tmq
->
consumerId
,
tmq
->
epoch
,
epoch
,
topicNumGet
);
SArray
*
newTopics
=
taosArrayInit
(
topicNumGet
,
sizeof
(
SMqClientTopic
));
if
(
newTopics
==
NULL
)
{
return
false
;
}
SHashObj
*
pHash
=
taosHashInit
(
64
,
MurmurHash3_32
,
false
,
HASH_NO_LOCK
);
if
(
pHash
==
NULL
)
{
taosArrayDestroy
(
newTopics
);
return
false
;
}
int32_t
topicNumCur
=
taosArrayGetSize
(
tmq
->
clientTopics
);
for
(
int32_t
i
=
0
;
i
<
topicNumCur
;
i
++
)
{
// find old topic
SMqClientTopic
*
pTopicCur
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
if
(
pTopicCur
->
vgs
)
{
int32_t
vgNumCur
=
taosArrayGetSize
(
pTopicCur
->
vgs
);
tscDebug
(
"consumer %ld new vg num: %d"
,
tmq
->
consumerId
,
vgNumCur
);
if
(
vgNumCur
==
0
)
break
;
for
(
int32_t
j
=
0
;
j
<
vgNumCur
;
j
++
)
{
SMqClientVg
*
pVgCur
=
taosArrayGet
(
pTopicCur
->
vgs
,
j
);
sprintf
(
vgKey
,
"%s:%d"
,
pTopicCur
->
topicName
,
pVgCur
->
vgId
);
tscDebug
(
"consumer %ld epoch %d vg %d build %s"
,
tmq
->
consumerId
,
epoch
,
pVgCur
->
vgId
,
vgKey
);
taosHashPut
(
pHash
,
vgKey
,
strlen
(
vgKey
),
&
pVgCur
->
currentOffset
,
sizeof
(
int64_t
));
}
break
;
}
}
for
(
int32_t
i
=
0
;
i
<
topicNumGet
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
pRsp
->
topics
,
i
);
topic
.
schema
=
pTopicEp
->
schema
;
taosHashClear
(
pHash
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
tstrncpy
(
topic
.
db
,
pTopicEp
->
db
,
TSDB_DB_FNAME_LEN
);
tscDebug
(
"consumer %ld update topic: %s"
,
tmq
->
consumerId
,
topic
.
topicName
);
int32_t
vgNumGet
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
topic
.
vgs
=
taosArrayInit
(
vgNumGet
,
sizeof
(
SMqClientVg
));
for
(
int32_t
j
=
0
;
j
<
vgNumGet
;
j
++
)
{
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
sprintf
(
vgKey
,
"%s:%d"
,
topic
.
topicName
,
pVgEp
->
vgId
);
int64_t
*
pOffset
=
taosHashGet
(
pHash
,
vgKey
,
strlen
(
vgKey
));
int64_t
offset
=
tmq
->
resetOffsetCfg
;
if
(
pOffset
!=
NULL
)
{
offset
=
*
pOffset
;
}
tscDebug
(
"consumer %ld(epoch %d) offset of vg %d updated to %ld"
,
tmq
->
consumerId
,
epoch
,
pVgEp
->
vgId
,
offset
);
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
currentOffset
=
offset
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
,
.
vgStatus
=
TMQ_VG_STATUS__IDLE
,
.
vgSkipCnt
=
0
,
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
}
taosArrayPush
(
newTopics
,
&
topic
);
}
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
taosHashCleanup
(
pHash
);
tmq
->
clientTopics
=
newTopics
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__NO_TOPIC
);
else
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__READY
);
atomic_store_32
(
&
tmq
->
epoch
,
epoch
);
return
set
;
}
bool
tmqUpdateEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqAskEpRsp
*
pRsp
)
{
/*printf("call update ep %d\n", epoch);*/
bool
set
=
false
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
8eb08fa0
...
...
@@ -554,8 +554,8 @@ typedef struct {
SVgObj
fixedSinkVg
;
int64_t
smaId
;
// 0 for unused
int8_t
trigger
;
int
32
_t
triggerParam
;
int64_t
water
M
ark
;
int
64
_t
triggerParam
;
int64_t
water
m
ark
;
char
*
sql
;
char
*
physicalPlan
;
SArray
*
tasks
;
// SArray<SArray<SStreamTask>>
...
...
source/dnode/mnode/impl/inc/mndStream.h
浏览文件 @
8eb08fa0
...
...
@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw
*
mndStreamActionEncode
(
SStreamObj
*
pStream
);
SSdbRow
*
mndStreamActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
int8_t
triggerType
,
int64_t
watermark
,
STrans
*
pTrans
);
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
8eb08fa0
...
...
@@ -33,8 +33,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if
(
tEncodeI8
(
pEncoder
,
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
pEncoder
,
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
water
M
ark
)
<
0
)
return
-
1
;
if
(
tEncodeI
64
(
pEncoder
,
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
water
m
ark
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sql
)
<
0
)
return
-
1
;
...
...
@@ -85,8 +85,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
pDecoder
,
&
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
water
M
ark
)
<
0
)
return
-
1
;
if
(
tDecodeI
64
(
pDecoder
,
&
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
water
m
ark
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
sql
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
8eb08fa0
...
...
@@ -387,6 +387,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// input
pFinalTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
// trigger
pFinalTask
->
triggerParam
=
pStream
->
triggerParam
;
// dispatch
if
(
mndAddDispatcherToInnerTask
(
pMnode
,
pTrans
,
pStream
,
pFinalTask
)
<
0
)
{
qDestroyQueryPlan
(
pPlan
);
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
8eb08fa0
...
...
@@ -561,6 +561,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj
.
sql
=
pCreate
->
sql
;
streamObj
.
createdBy
=
STREAM_CREATED_BY__SMA
;
streamObj
.
smaId
=
smaObj
.
uid
;
streamObj
.
watermark
=
0
;
streamObj
.
trigger
=
STREAM_TRIGGER_AT_ONCE
;
if
(
mndAllocSmaVgroup
(
pMnode
,
pDb
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
{
mError
(
"sma:%s, failed to create since %s"
,
smaObj
.
name
,
terrstr
());
...
...
@@ -583,7 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if
(
mndSetUpdateSmaStbCommitLogs
(
pMnode
,
pTrans
,
pStb
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoActions
(
pMnode
,
pTrans
,
pDb
,
&
streamObj
.
fixedSinkVg
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
STREAM_TRIGGER_AT_ONCE
,
0
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
code
=
0
;
...
...
@@ -1012,7 +1014,6 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
rsp
->
suid
=
pStb
->
uid
;
rsp
->
version
=
pStb
->
smaVer
;
mndReleaseStb
(
pMnode
,
pStb
);
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_SMA
,
pIter
,
(
void
**
)
&
pSma
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
8eb08fa0
...
...
@@ -235,16 +235,15 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesNodeToString
((
SNode
*
)
pPlan
,
false
,
pStr
,
NULL
);
code
=
nodesNodeToString
((
SNode
*
)
pPlan
,
false
,
pStr
,
NULL
);
}
nodesDestroyNode
(
pAst
);
nodesDestroyNode
((
SNode
*
)
pPlan
);
nodesDestroyNode
((
SNode
*
)
pPlan
);
terrno
=
code
;
return
code
;
}
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
int8_t
triggerType
,
int64_t
watermark
,
STrans
*
pTrans
)
{
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
)
{
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
...
...
@@ -258,7 +257,6 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
// free
nodesDestroyNode
(
pAst
);
#if 0
printf("|");
for (int i = 0; i < pStream->outputSchema.nCols; i++) {
...
...
@@ -268,7 +266,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
#endif
if
(
TSDB_CODE_SUCCESS
!=
mndStreamGetPlanString
(
ast
,
triggerType
,
watermark
,
&
pStream
->
physicalPlan
))
{
if
(
TSDB_CODE_SUCCESS
!=
mndStreamGetPlanString
(
ast
,
pStream
->
trigger
,
pStream
->
watermark
,
&
pStream
->
physicalPlan
))
{
mError
(
"topic:%s, failed to get plan since %s"
,
pStream
->
name
,
terrstr
());
return
-
1
;
}
...
...
@@ -391,7 +389,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
streamObj
.
smaId
=
0
;
/*streamObj.physicalPlan = "";*/
streamObj
.
trigger
=
pCreate
->
triggerType
;
streamObj
.
waterMark
=
pCreate
->
watermark
;
streamObj
.
watermark
=
pCreate
->
watermark
;
streamObj
.
triggerParam
=
pCreate
->
maxDelay
;
if
(
streamObj
.
targetSTbName
[
0
])
{
pDb
=
mndAcquireDbByStb
(
pMnode
,
streamObj
.
targetSTbName
);
...
...
@@ -409,7 +408,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
}
mDebug
(
"trans:%d, used to create stream:%s"
,
pTrans
->
id
,
pCreate
->
name
);
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
p
Create
->
triggerType
,
pCreate
->
watermark
,
p
Trans
)
!=
0
)
{
if
(
mndAddStreamToTrans
(
pMnode
,
&
streamObj
,
pCreate
->
ast
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to add stream since %s"
,
pTrans
->
id
,
terrstr
());
mndTransDrop
(
pTrans
);
return
-
1
;
...
...
@@ -566,7 +565,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
targetSTbName
,
true
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
water
M
ark
,
false
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
water
m
ark
,
false
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pStream
->
trigger
,
false
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
8eb08fa0
...
...
@@ -375,6 +375,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if
(
pTask
->
inputQueue
==
NULL
||
pTask
->
outputQueue
==
NULL
)
goto
FAIL
;
pTask
->
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
;
// exec
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
// expand runners
...
...
@@ -406,9 +408,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
tdGetSTSChemaFromSSChema
(
&
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
}
streamSetupTrigger
(
pTask
);
tqInfo
(
"deploy stream task id %d child id %d on vg %d"
,
pTask
->
taskId
,
pTask
->
childId
,
pTq
->
pVnode
->
config
.
vgId
);
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
&
pTask
,
sizeof
(
void
*
));
return
0
;
FAIL:
...
...
@@ -431,7 +436,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
SStreamTask
*
pTask
=
*
(
SStreamTask
*
*
)
pIter
;
if
(
pTask
->
inputType
!=
STREAM_INPUT__DATA_SUBMIT
)
continue
;
if
(
!
failed
)
{
...
...
@@ -439,7 +444,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
continue
;
}
if
(
stream
Trigger
ByWrite
(
pTask
,
pTq
->
pVnode
->
config
.
vgId
,
&
pTq
->
pVnode
->
msgCb
)
<
0
)
{
if
(
stream
Launch
ByWrite
(
pTask
,
pTq
->
pVnode
->
config
.
vgId
,
&
pTq
->
pVnode
->
msgCb
)
<
0
)
{
continue
;
}
}
else
{
...
...
@@ -459,7 +464,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
//
SStreamTaskRunReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamTaskProcessRunReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
);
return
0
;
}
...
...
@@ -473,7 +478,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderInit
(
&
decoder
,
msgBody
,
msgLen
);
tDecodeStreamDispatchReq
(
&
decoder
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SRpcMsg
rsp
=
{
.
info
=
pMsg
->
info
,
.
code
=
0
,
...
...
@@ -485,7 +490,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessTaskRecoverReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamTaskRecoverReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessRecoverReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
return
0
;
}
...
...
@@ -493,7 +498,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessTaskDispatchRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamDispatchRsp
*
pRsp
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessDispatchRsp
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pRsp
);
return
0
;
}
...
...
@@ -501,7 +506,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t
tqProcessTaskRecoverRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SStreamTaskRecoverRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
streamProcessRecoverRsp
(
pTask
,
pRsp
);
return
0
;
}
source/libs/executor/src/executor.c
浏览文件 @
8eb08fa0
...
...
@@ -19,7 +19,8 @@
#include "tdatablock.h"
#include "vnode.h"
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
,
char
*
id
)
{
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
,
char
*
id
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
...
...
@@ -43,6 +44,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
if
(
pInfo
->
blockType
==
0
)
{
pInfo
->
blockType
=
type
;
}
else
if
(
pInfo
->
blockType
!=
type
)
{
ASSERT
(
0
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
...
...
@@ -51,7 +53,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
qError
(
"submit msg messed up when initing stream block, %s"
PRIx64
,
id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
else
{
}
else
if
(
type
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
for
(
int32_t
i
=
0
;
i
<
numOfBlocks
;
++
i
)
{
SSDataBlock
*
pDataBlock
=
&
((
SSDataBlock
*
)
input
)[
i
];
...
...
@@ -62,6 +64,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll
(
p
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
taosArrayPush
(
pInfo
->
pBlockLists
,
&
p
);
}
}
else
{
ASSERT
(
0
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -83,7 +87,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
**
)
pBlocks
,
numOfBlocks
,
type
,
assignUid
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
**
)
pBlocks
,
numOfBlocks
,
type
,
assignUid
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
}
else
{
...
...
@@ -162,7 +167,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo
=
pInfo
->
pDownstream
[
0
];
}
int32_t
code
=
0
;
int32_t
code
=
0
;
SStreamBlockScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
...
...
@@ -178,9 +183,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return
code
;
}
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
ASSERT
(
tinfo
!=
NULL
&&
dbName
!=
NULL
&&
tableName
!=
NULL
);
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
*
sversion
=
pTaskInfo
->
schemaVer
.
sversion
;
*
tversion
=
pTaskInfo
->
schemaVer
.
tversion
;
...
...
@@ -196,4 +202,4 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab
}
return
0
;
}
\ No newline at end of file
}
source/libs/function/src/builtins.c
浏览文件 @
8eb08fa0
...
...
@@ -1682,7 +1682,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"top"
,
.
type
=
FUNCTION_TYPE_TOP
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
|
FUNC_MGT_FORBID_STREAM_FUNC
,
.
translateFunc
=
translateTopBot
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
topBotFunctionSetup
,
...
...
@@ -1717,7 +1717,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"bottom"
,
.
type
=
FUNCTION_TYPE_BOTTOM
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
,
.
classification
=
FUNC_MGT_AGG_FUNC
|
FUNC_MGT_SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS_FUNC
|
FUNC_MGT_FORBID_STREAM_FUNC
,
.
translateFunc
=
translateTopBot
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
topBotFunctionSetup
,
...
...
source/libs/stream/inc/streamInc.h
浏览文件 @
8eb08fa0
...
...
@@ -23,6 +23,13 @@
extern
"C"
{
#endif
typedef
struct
{
int8_t
inited
;
void
*
timer
;
}
SStreamGlobalEnv
;
static
SStreamGlobalEnv
streamEnv
;
int32_t
streamExec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamDispatch
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamDispatchReqToData
(
const
SStreamDispatchReq
*
pReq
,
SStreamDataBlock
*
pData
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
8eb08fa0
...
...
@@ -14,8 +14,74 @@
*/
#include "streamInc.h"
#include "ttimer.h"
int32_t
streamTriggerByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
)
{
int32_t
streamInit
()
{
int8_t
old
;
while
(
1
)
{
old
=
atomic_val_compare_exchange_8
(
&
streamEnv
.
inited
,
0
,
2
);
if
(
old
!=
2
)
break
;
}
if
(
old
==
0
)
{
streamEnv
.
timer
=
taosTmrInit
(
10000
,
100
,
10000
,
"STREAM"
);
if
(
streamEnv
.
timer
==
NULL
)
{
atomic_store_8
(
&
streamEnv
.
inited
,
0
);
return
-
1
;
}
atomic_store_8
(
&
streamEnv
.
inited
,
1
);
}
return
0
;
}
void
streamCleanUp
()
{
int8_t
old
;
while
(
1
)
{
old
=
atomic_val_compare_exchange_8
(
&
streamEnv
.
inited
,
1
,
2
);
if
(
old
!=
2
)
break
;
}
if
(
old
==
1
)
{
taosTmrCleanUp
(
streamEnv
.
timer
);
atomic_store_8
(
&
streamEnv
.
inited
,
0
);
}
}
void
streamTriggerByTimer
(
void
*
param
,
void
*
tmrId
)
{
SStreamTask
*
pTask
=
(
void
*
)
param
;
if
(
atomic_load_8
(
&
pTask
->
triggerStatus
)
==
TASK_TRIGGER_STATUS__ACTIVE
)
{
SStreamTrigger
*
trigger
=
taosAllocateQitem
(
sizeof
(
SStreamTrigger
),
DEF_QITEM
);
if
(
trigger
==
NULL
)
return
;
trigger
->
type
=
STREAM_INPUT__TRIGGER
;
trigger
->
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
if
(
trigger
->
pBlock
==
NULL
)
{
taosFreeQitem
(
trigger
);
return
;
}
trigger
->
pBlock
->
info
.
type
=
STREAM_GET_ALL
;
atomic_store_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
streamTaskInput
(
pTask
,
(
SStreamQueueItem
*
)
trigger
);
streamLaunchByWrite
(
pTask
,
pTask
->
nodeId
,
pTask
->
pMsgCb
);
}
taosTmrReset
(
streamTriggerByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
,
&
pTask
->
timer
);
}
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
)
{
if
(
pTask
->
triggerParam
!=
0
)
{
if
(
streamInit
()
<
0
)
{
return
-
1
;
}
pTask
->
timer
=
taosTmrStart
(
streamTriggerByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
);
pTask
->
triggerStatus
=
TASK_TRIGGER_STATUS__IN_ACTIVE
;
}
return
0
;
}
int32_t
streamLaunchByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
)
{
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
SStreamTaskRunReq
*
pRunReq
=
rpcMallocCont
(
sizeof
(
SStreamTaskRunReq
));
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
8eb08fa0
...
...
@@ -20,15 +20,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
void
*
exec
=
pTask
->
exec
.
executor
;
// set input
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamQueueItem
*
pItem
=
(
SStreamQueueItem
*
)
data
;
if
(
pItem
->
type
==
STREAM_INPUT__TRIGGER
)
{
SStreamTrigger
*
pTrigger
=
(
SStreamTrigger
*
)
data
;
qSetMultiStreamInput
(
exec
,
pTrigger
->
pBlock
,
1
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
ASSERT
(
pSubmit
->
type
==
STREAM_INPUT__DATA_SUBMIT
);
ASSERT
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
);
qSetStreamInput
(
exec
,
pSubmit
->
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
,
false
);
}
else
if
(
p
Task
->
inputT
ype
==
STREAM_INPUT__DATA_BLOCK
)
{
}
else
if
(
p
Item
->
t
ype
==
STREAM_INPUT__DATA_BLOCK
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
ASSERT
(
pBlock
->
type
==
STREAM_INPUT__DATA_BLOCK
);
ASSERT
(
pTask
->
inputType
==
STREAM_INPUT__DATA_BLOCK
);
SArray
*
blocks
=
pBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
}
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
8eb08fa0
...
...
@@ -72,6 +72,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tSerializeSUseDbRspImp
(
pEncoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
}
if
(
tEncodeI64
(
pEncoder
,
pTask
->
triggerParam
)
<
0
)
return
-
1
;
/*tEndEncode(pEncoder);*/
return
pEncoder
->
pos
;
...
...
@@ -121,6 +122,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
if
(
tDeserializeSUseDbRspImp
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
triggerParam
)
<
0
)
return
-
1
;
/*tEndDecode(pDecoder);*/
return
0
;
...
...
source/util/src/tarray.c
浏览文件 @
8eb08fa0
...
...
@@ -174,7 +174,11 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
void
*
taosArrayAddAll
(
SArray
*
pArray
,
const
SArray
*
pInput
)
{
return
taosArrayAddBatch
(
pArray
,
pInput
->
pData
,
(
int32_t
)
taosArrayGetSize
(
pInput
));
if
(
pInput
)
{
return
taosArrayAddBatch
(
pArray
,
pInput
->
pData
,
(
int32_t
)
taosArrayGetSize
(
pInput
));
}
else
{
return
NULL
;
}
}
void
*
taosArrayPop
(
SArray
*
pArray
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录