Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
edb9da84
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
edb9da84
编写于
6月 06, 2022
作者:
L
Liu Jicong
提交者:
GitHub
6月 06, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13507 from taosdata/feature/stream
refactor(stream)
上级
6de91932
2677a3c8
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
1006 addition
and
735 deletion
+1006
-735
include/common/tcommon.h
include/common/tcommon.h
+12
-10
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+122
-49
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+5
-5
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+5
-5
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+21
-46
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+2
-2
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+5
-6
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+5
-5
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+2
-2
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+9
-3
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+112
-0
source/libs/stream/src/streamData.c
source/libs/stream/src/streamData.c
+85
-0
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+123
-0
source/libs/stream/src/streamMsg.c
source/libs/stream/src/streamMsg.c
+150
-0
source/libs/stream/src/streamQueue.c
source/libs/stream/src/streamQueue.c
+43
-0
source/libs/stream/src/streamSink.c
source/libs/stream/src/streamSink.c
+157
-0
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+138
-0
source/libs/stream/src/streamUpdate.c
source/libs/stream/src/streamUpdate.c
+0
-0
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+0
-594
source/util/src/tqueue.c
source/util/src/tqueue.c
+10
-8
未找到文件。
include/common/tcommon.h
浏览文件 @
edb9da84
...
...
@@ -77,7 +77,9 @@ typedef struct SDataBlockInfo {
int16_t
numOfCols
;
int16_t
hasVarCol
;
int32_t
capacity
;
EStreamType
type
;
// TODO: optimize and remove following
int32_t
childId
;
// used for stream, do not serialize
EStreamType
type
;
// used for stream, do not serialize
}
SDataBlockInfo
;
typedef
struct
SSDataBlock
{
...
...
@@ -105,14 +107,14 @@ typedef struct SColumnInfoData {
}
SColumnInfoData
;
typedef
struct
SQueryTableDataCond
{
//STimeWindow twindow;
//
STimeWindow twindow;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
int32_t
numOfTWindows
;
STimeWindow
*
twindows
;
STimeWindow
*
twindows
;
}
SQueryTableDataCond
;
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
...
...
@@ -202,17 +204,17 @@ typedef struct SExprInfo {
}
SExprInfo
;
typedef
struct
{
const
char
*
key
;
int32_t
keyLen
;
uint8_t
type
;
union
{
const
char
*
key
;
int32_t
keyLen
;
uint8_t
type
;
union
{
const
char
*
value
;
int64_t
i
;
uint64_t
u
;
double
d
;
float
f
;
};
int32_t
length
;
int32_t
length
;
}
SSmlKv
;
#define QUERY_ASC_FORWARD_STEP 1
...
...
@@ -220,7 +222,7 @@ typedef struct {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define SORT_QSORT_T 0x1
#define SORT_QSORT_T
0x1
#define SORT_SPILLED_MERGE_SORT_T 0x2
typedef
struct
SSortExecInfo
{
int32_t
sortMethod
;
...
...
include/libs/stream/tstream.h
浏览文件 @
edb9da84
...
...
@@ -24,8 +24,8 @@
extern
"C"
{
#endif
#ifndef _
T
STREAM_H_
#define _
T
STREAM_H_
#ifndef _STREAM_H_
#define _STREAM_H_
typedef
struct
SStreamTask
SStreamTask
;
...
...
@@ -39,6 +39,7 @@ enum {
TASK_INPUT_STATUS__NORMAL
=
1
,
TASK_INPUT_STATUS__BLOCKED
,
TASK_INPUT_STATUS__RECOVER
,
TASK_INPUT_STATUS__PROCESSING
,
TASK_INPUT_STATUS__STOP
,
TASK_INPUT_STATUS__FAILED
,
};
...
...
@@ -60,6 +61,10 @@ enum {
STREAM_INPUT__CHECKPOINT
,
};
typedef
struct
{
int8_t
type
;
}
SStreamQueueItem
;
typedef
struct
{
int8_t
type
;
int64_t
ver
;
...
...
@@ -80,55 +85,51 @@ typedef struct {
int8_t
type
;
}
SStreamCheckpoint
;
enum
{
STREAM_QUEUE__SUCESS
=
1
,
STREAM_QUEUE__FAILED
,
STREAM_QUEUE__PROCESSING
,
};
typedef
struct
{
STaosQueue
*
queue
;
STaosQall
*
qall
;
void
*
qItem
;
int8_t
failed
;
}
SStreamQ
;
int8_t
status
;
}
SStreamQ
ueue
;
static
FORCE_INLINE
void
*
streamQCurItem
(
SStreamQ
*
queue
)
{
//
return
queue
->
qItem
;
SStreamQueue
*
streamQueueOpen
();
void
streamQueueClose
(
SStreamQueue
*
queue
);
static
FORCE_INLINE
void
streamQueueProcessSuccess
(
SStreamQueue
*
queue
)
{
ASSERT
(
atomic_load_8
(
&
queue
->
status
)
==
STREAM_QUEUE__PROCESSING
);
queue
->
qItem
=
NULL
;
atomic_store_8
(
&
queue
->
status
,
STREAM_QUEUE__SUCESS
);
}
static
FORCE_INLINE
void
*
streamQNextItem
(
SStreamQ
*
queue
)
{
int8_t
failed
=
atomic_load_8
(
&
queue
->
failed
);
if
(
failed
)
{
static
FORCE_INLINE
void
streamQueueProcessFail
(
SStreamQueue
*
queue
)
{
ASSERT
(
atomic_load_8
(
&
queue
->
status
)
==
STREAM_QUEUE__PROCESSING
);
atomic_store_8
(
&
queue
->
status
,
STREAM_QUEUE__FAILED
);
}
static
FORCE_INLINE
void
*
streamQueueCurItem
(
SStreamQueue
*
queue
)
{
return
queue
->
qItem
;
}
static
FORCE_INLINE
void
*
streamQueueNextItem
(
SStreamQueue
*
queue
)
{
int8_t
dequeueFlag
=
atomic_exchange_8
(
&
queue
->
status
,
STREAM_QUEUE__PROCESSING
);
if
(
dequeueFlag
==
STREAM_QUEUE__FAILED
)
{
ASSERT
(
queue
->
qItem
!=
NULL
);
return
streamQCurItem
(
queue
);
return
streamQ
ueue
CurItem
(
queue
);
}
else
{
taosGetQitem
(
queue
->
qall
,
&
queue
->
qItem
);
if
(
queue
->
qItem
==
NULL
)
{
taosReadAllQitems
(
queue
->
queue
,
queue
->
qall
);
taosGetQitem
(
queue
->
qall
,
&
queue
->
qItem
);
}
return
streamQCurItem
(
queue
);
return
streamQ
ueue
CurItem
(
queue
);
}
}
static
FORCE_INLINE
void
streamQSetFail
(
SStreamQ
*
queue
)
{
atomic_store_8
(
&
queue
->
failed
,
1
);
}
static
FORCE_INLINE
void
streamQSetSuccess
(
SStreamQ
*
queue
)
{
atomic_store_8
(
&
queue
->
failed
,
0
);
}
static
FORCE_INLINE
SStreamDataSubmit
*
streamDataSubmitNew
(
SSubmitReq
*
pReq
)
{
SStreamDataSubmit
*
pDataSubmit
=
(
SStreamDataSubmit
*
)
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pDataSubmit
==
NULL
)
return
NULL
;
pDataSubmit
->
dataRef
=
(
int32_t
*
)
taosMemoryMalloc
(
sizeof
(
int32_t
));
if
(
pDataSubmit
->
dataRef
==
NULL
)
goto
FAIL
;
pDataSubmit
->
data
=
pReq
;
*
pDataSubmit
->
dataRef
=
1
;
pDataSubmit
->
type
=
STREAM_INPUT__DATA_SUBMIT
;
return
pDataSubmit
;
FAIL:
taosFreeQitem
(
pDataSubmit
);
return
NULL
;
}
static
FORCE_INLINE
void
streamDataSubmitRefInc
(
SStreamDataSubmit
*
pDataSubmit
)
{
//
atomic_add_fetch_32
(
pDataSubmit
->
dataRef
,
1
);
}
SStreamDataSubmit
*
streamDataSubmitNew
(
SSubmitReq
*
pReq
);
static
FORCE_INLINE
void
streamDataSubmitRefDec
(
SStreamDataSubmit
*
pDataSubmit
)
{
int32_t
ref
=
atomic_sub_fetch_32
(
pDataSubmit
->
dataRef
,
1
);
...
...
@@ -141,9 +142,31 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
SStreamDataSubmit
*
streamSubmitRefClone
(
SStreamDataSubmit
*
pSubmit
);
#if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
static FORCE_INLINE int32_t streamEnqueue1(SStreamQueue* queue, SStreamQueueItem* pItem) {
int8_t inputStatus = atomic_load_8(&queue->enqueueStatus);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
if (pSubmitClone == NULL) {
atomic_store_8(&queue->enqueueStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
taosWriteQitem(queue->queue, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
taosWriteQitem(queue->queue, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(queue->queue, pItem);
}
return 0;
}
return 0;
}
#endif
typedef
struct
{
int8_t
parallelizable
;
char
*
qmsg
;
...
...
@@ -236,13 +259,17 @@ struct SStreamTask {
int8_t
dispatchType
;
int16_t
dispatchMsgType
;
// node info
int32_t
childId
;
int32_t
nodeId
;
SEpSet
epSet
;
// exec
STaskExec
exec
;
// local sink
// TODO: merge sink and dispatch
// local sink
union
{
STaskSinkTb
tbSink
;
STaskSinkSma
smaSink
;
...
...
@@ -258,25 +285,61 @@ struct SStreamTask {
int8_t
inputStatus
;
int8_t
outputStatus
;
#if 0
STaosQueue* inputQ;
STaosQall* inputQAll;
STaosQueue* outputQ;
STaosQall* outputQAll;
#endif
SStreamQueue
*
inputQueue
;
SStreamQueue
*
outputQueue
;
// application storage
void
*
ahandle
;
};
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
);
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
,
int32_t
childId
);
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
);
int32_t
tDecodeSStreamTask
(
SDecoder
*
pDecoder
,
SStreamTask
*
pTask
);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
typedef
struct
{
// SMsgHead head;
SStreamTask
*
task
;
}
SStreamTaskDeployReq
;
static
FORCE_INLINE
int32_t
streamTaskInput
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
)
{
while
(
1
)
{
int8_t
inputStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__NORMAL
,
TASK_INPUT_STATUS__PROCESSING
);
if
(
inputStatus
==
TASK_INPUT_STATUS__NORMAL
)
{
break
;
}
ASSERT
(
0
);
}
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmitClone
=
streamSubmitRefClone
((
SStreamDataSubmit
*
)
pItem
);
if
(
pSubmitClone
==
NULL
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
return
-
1
;
}
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pSubmitClone
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__CHECKPOINT
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
}
// TODO: back pressure
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__NORMAL
);
return
0
;
}
static
FORCE_INLINE
void
streamTaskInputFail
(
SStreamTask
*
pTask
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
}
static
FORCE_INLINE
int32_t
streamTaskOutput
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
pBlock
)
{
taosWriteQitem
(
pTask
->
outputQueue
->
queue
,
pBlock
);
return
0
;
}
typedef
struct
{
int32_t
reserved
;
...
...
@@ -289,6 +352,11 @@ typedef struct {
SArray
*
data
;
// SArray<SSDataBlock>
}
SStreamTaskExecReq
;
typedef
struct
{
// SMsgHead head;
SStreamTask
*
task
;
}
SStreamTaskDeployReq
;
int32_t
tEncodeSStreamTaskExecReq
(
void
**
buf
,
const
SStreamTaskExecReq
*
pReq
);
void
*
tDecodeSStreamTaskExecReq
(
const
void
*
buf
,
SStreamTaskExecReq
*
pReq
);
void
tFreeSStreamTaskExecReq
(
SStreamTaskExecReq
*
pReq
);
...
...
@@ -297,6 +365,12 @@ typedef struct {
int32_t
reserved
;
}
SStreamTaskExecRsp
;
typedef
struct
{
SMsgHead
head
;
int64_t
streamId
;
int32_t
taskId
;
}
SStreamTaskRunReq
;
typedef
struct
{
// SMsgHead head;
int64_t
streamId
;
...
...
@@ -304,21 +378,18 @@ typedef struct {
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamSinkReq
;
typedef
struct
{
SMsgHead
head
;
int64_t
streamId
;
int32_t
taskId
;
}
SStreamTaskRunReq
;
typedef
struct
{
int64_t
streamId
;
int32_t
taskId
;
int32_t
sourceTaskId
;
int32_t
sourceVg
;
int32_t
sourceChildId
;
#if 0
int64_t sourceVer;
#endif
SArray
*
data
;
// SArray<SSDataBlock>
int32_t
blockNum
;
SArray
*
dataLen
;
// SArray<int32_t>
SArray
*
data
;
// SArray<SRetrieveTableRsp*>
}
SStreamDispatchReq
;
typedef
struct
{
...
...
@@ -340,6 +411,8 @@ typedef struct {
int8_t
inputStatus
;
}
SStreamTaskRecoverRsp
;
int32_t
streamTriggerByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
);
int32_t
streamEnqueueDataSubmit
(
SStreamTask
*
pTask
,
SStreamDataSubmit
*
input
);
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
);
int32_t
streamDequeueOutput
(
SStreamTask
*
pTask
,
void
**
output
);
...
...
@@ -356,4 +429,4 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
}
#endif
#endif
/* ifndef _
T
STREAM_H_ */
#endif
/* ifndef _STREAM_H_ */
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
edb9da84
...
...
@@ -190,7 +190,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -230,7 +230,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
int32_t
mndAddFixedSinkToStream
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SStreamObj
*
pStream
)
{
ASSERT
(
pStream
->
fixedSinkVgId
!=
0
);
SArray
*
tasks
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -322,7 +322,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
// source part
pTask
->
sourceType
=
TASK_SOURCE__SCAN
;
pTask
->
inputType
=
TASK_INPUT_TYPE__SUMBIT_BLOCK
;
...
...
@@ -387,7 +387,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// else, assign to vnode
ASSERT
(
plan
->
subplanType
==
SUBPLAN_TYPE_MERGE
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
// source part, currently only support multi source
pTask
->
sourceType
=
TASK_SOURCE__PIPE
;
...
...
@@ -477,7 +477,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
);
SStreamTask
*
pTask
=
tNewSStreamTask
(
pStream
->
uid
,
0
);
// source part
pTask
->
sourceType
=
TASK_SOURCE__MERGE
;
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
edb9da84
...
...
@@ -78,10 +78,10 @@ typedef struct {
tmr_h
timerId
;
int8_t
tmrStopped
;
// exec
int8_t
inputStatus
;
int8_t
execStatus
;
SStreamQ
inputQ
;
SRWLatch
lock
;
int8_t
inputStatus
;
int8_t
execStatus
;
SStreamQ
ueue
inputQ
;
SRWLatch
lock
;
}
STqPushHandle
;
// tqExec
...
...
@@ -107,7 +107,7 @@ typedef struct {
STqExecCol
execCol
;
STqExecTb
execTb
;
STqExecDb
execDb
;
}
exec
;
};
}
STqExecHandle
;
typedef
struct
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
edb9da84
...
...
@@ -266,7 +266,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
pHandle
->
execHandle
.
exec
.
exec
Col
.
qmsg
=
req
.
qmsg
;
pHandle
->
execHandle
.
execCol
.
qmsg
=
req
.
qmsg
;
req
.
qmsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
handle
=
{
...
...
@@ -274,15 +274,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pHandle
->
execHandle
.
exec
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pHandle
->
execHandle
.
exec
.
execCol
.
qmsg
,
&
handle
);
ASSERT
(
pHandle
->
execHandle
.
exec
.
execCol
.
task
[
i
]);
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
ASSERT
(
pHandle
->
execHandle
.
execCol
.
task
[
i
]);
}
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
pHandle
->
execHandle
.
exec
.
exec
Db
.
pFilterOutTbUid
=
pHandle
->
execHandle
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pHandle
->
execHandle
.
exec
.
exec
Tb
.
suid
=
req
.
suid
;
pHandle
->
execHandle
.
execTb
.
suid
=
req
.
suid
;
SArray
*
tbUidList
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
tsdbGetCtbIdList
(
pTq
->
pVnode
->
pMeta
,
req
.
suid
,
tbUidList
);
tqDebug
(
"vg %d, tq try get suid: %ld"
,
pTq
->
pVnode
->
config
.
vgId
,
req
.
suid
);
...
...
@@ -296,17 +295,20 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
taosArrayDestroy
(
tbUidList
);
}
taosHashPut
(
pTq
->
handles
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
}
}
else
{
/*ASSERT(pExec->consumerId == req.oldConsumerId);*/
// TODO handle qmsg and exec modification
atomic_store_32
(
&
pHandle
->
epoch
,
-
1
);
atomic_store_64
(
&
pHandle
->
consumerId
,
req
.
newConsumerId
);
atomic_add_fetch_32
(
&
pHandle
->
epoch
,
1
);
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
}
}
if
(
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
}
return
0
;
}
...
...
@@ -323,16 +325,13 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
tDecoderClear
(
&
decoder
);
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputQueue
=
streamQueueOpen
();
pTask
->
outputQueue
=
streamQueueOpen
();
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
pTask
->
inputQ
=
taosOpenQueue
();
pTask
->
outputQ
=
taosOpenQueue
();
pTask
->
inputQAll
=
taosAllocateQall
();
pTask
->
outputQAll
=
taosAllocateQall
();
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
||
pTask
->
inputQAll
==
NULL
||
pTask
->
outputQAll
==
NULL
)
goto
FAIL
;
if
(
pTask
->
inputQueue
==
NULL
||
pTask
->
outputQueue
==
NULL
)
goto
FAIL
;
// exec
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
...
...
@@ -369,10 +368,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
return
0
;
FAIL:
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
if
(
pTask
->
inputQueue
)
streamQueueClose
(
pTask
->
inputQueue
);
if
(
pTask
->
outputQueue
)
streamQueueClose
(
pTask
->
outputQueue
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
-
1
;
}
...
...
@@ -393,38 +390,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
pTask
->
inputType
!=
STREAM_INPUT__DATA_SUBMIT
)
continue
;
int8_t
inputStatus
=
atomic_load_8
(
&
pTask
->
inputStatus
);
if
(
inputStatus
==
TASK_INPUT_STATUS__NORMAL
)
{
if
(
failed
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
if
(
!
failed
)
{
if
(
streamTaskInput
(
pTask
,
(
SStreamQueueItem
*
)
pSubmit
)
<
0
)
{
continue
;
}
SStreamDataSubmit
*
pSubmitClone
=
streamSubmitRefClone
(
pSubmit
);
if
(
pSubmitClone
==
NULL
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
if
(
streamTriggerByWrite
(
pTask
,
pTq
->
pVnode
->
config
.
vgId
,
&
pTq
->
pVnode
->
msgCb
)
<
0
)
{
continue
;
}
taosWriteQitem
(
pTask
->
inputQ
,
pSubmitClone
);
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
SStreamTaskRunReq
*
pRunReq
=
taosMemoryMalloc
(
sizeof
(
SStreamTaskRunReq
));
if
(
pRunReq
==
NULL
)
continue
;
// TODO: do we need htonl?
pRunReq
->
head
.
vgId
=
pTq
->
pVnode
->
config
.
vgId
;
pRunReq
->
streamId
=
pTask
->
streamId
;
pRunReq
->
taskId
=
pTask
->
taskId
;
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_TASK_RUN
,
.
pCont
=
pRunReq
,
.
contLen
=
sizeof
(
SStreamTaskRunReq
),
};
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
FETCH_QUEUE
,
&
msg
);
}
}
else
{
// blocked or stopped, do nothing
streamTaskInputFail
(
pTask
);
}
}
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
edb9da84
...
...
@@ -59,7 +59,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD
int32_t
tqDataExec
(
STQ
*
pTq
,
STqExecHandle
*
pExec
,
SSubmitReq
*
pReq
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
)
{
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
qTaskInfo_t
task
=
pExec
->
exec
.
exec
Col
.
task
[
workerId
];
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
workerId
];
ASSERT
(
task
);
qSetStreamInput
(
task
,
pReq
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
,
false
);
while
(
1
)
{
...
...
@@ -101,7 +101,7 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
pRsp
->
withSchema
=
1
;
STqReadHandle
*
pReader
=
pExec
->
pExecReader
[
workerId
];
tqReadHandleSetMsg
(
pReader
,
pReq
,
0
);
while
(
tqNextDataBlockFilterOut
(
pReader
,
pExec
->
exec
.
exec
Db
.
pFilterOutTbUid
))
{
while
(
tqNextDataBlockFilterOut
(
pReader
,
pExec
->
execDb
.
pFilterOutTbUid
))
{
SSDataBlock
block
=
{
0
};
if
(
tqRetrieveDataBlock
(
&
block
.
pDataBlock
,
pReader
,
&
block
.
info
.
groupId
,
&
block
.
info
.
uid
,
&
block
.
info
.
rows
,
&
block
.
info
.
numOfCols
)
<
0
)
{
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
edb9da84
...
...
@@ -22,7 +22,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if
(
tEncodeI32
(
pEncoder
,
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
execHandle
.
exec
.
exec
Col
.
qmsg
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
execHandle
.
execCol
.
qmsg
)
<
0
)
return
-
1
;
}
tEndEncode
(
pEncoder
);
return
pEncoder
->
pos
;
...
...
@@ -35,7 +35,7 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if
(
tDecodeI32
(
pDecoder
,
&
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pHandle
->
execHandle
.
exec
.
exec
Col
.
qmsg
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pHandle
->
execHandle
.
execCol
.
qmsg
)
<
0
)
return
-
1
;
}
tEndDecode
(
pDecoder
);
return
0
;
...
...
@@ -88,12 +88,11 @@ int32_t tqMetaOpen(STQ* pTq) {
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
handle
.
execHandle
.
exec
.
execCol
.
qmsg
,
&
reader
);
ASSERT
(
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]);
handle
.
execHandle
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
);
ASSERT
(
handle
.
execHandle
.
execCol
.
task
[
i
]);
}
}
else
{
handle
.
execHandle
.
exec
.
exec
Db
.
pFilterOutTbUid
=
handle
.
execHandle
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
taosHashPut
(
pTq
->
handles
,
pKey
,
kLen
,
&
handle
,
sizeof
(
STqHandle
));
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
edb9da84
...
...
@@ -29,13 +29,13 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm
}
// update processed
atomic_store_64
(
&
pHandle
->
pushHandle
.
processedVer
,
pSubmit
->
ver
);
streamQ
Set
Success
(
&
pHandle
->
pushHandle
.
inputQ
);
streamQ
ueueProcess
Success
(
&
pHandle
->
pushHandle
.
inputQ
);
streamDataSubmitRefDec
(
pSubmit
);
if
(
pRsp
->
blockNum
>
0
)
{
*
ppSubmit
=
pSubmit
;
return
0
;
}
else
{
pSubmit
=
streamQNextItem
(
&
pHandle
->
pushHandle
.
inputQ
);
pSubmit
=
streamQ
ueue
NextItem
(
&
pHandle
->
pushHandle
.
inputQ
);
}
}
*
ppSubmit
=
pSubmit
;
...
...
@@ -52,14 +52,14 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
// 2. check processedVer
// 2.1. if not missed, get msg from queue
// 2.2. if missed, scan wal
pSubmit
=
streamQNextItem
(
&
pHandle
->
pushHandle
.
inputQ
);
pSubmit
=
streamQ
ueue
NextItem
(
&
pHandle
->
pushHandle
.
inputQ
);
while
(
pHandle
->
pushHandle
.
processedVer
<=
pSubmit
->
ver
)
{
// read from wal
}
while
(
pHandle
->
pushHandle
.
processedVer
>
pSubmit
->
ver
+
1
)
{
streamQ
Set
Success
(
&
pHandle
->
pushHandle
.
inputQ
);
streamQ
ueueProcess
Success
(
&
pHandle
->
pushHandle
.
inputQ
);
streamDataSubmitRefDec
(
pSubmit
);
pSubmit
=
streamQNextItem
(
&
pHandle
->
pushHandle
.
inputQ
);
pSubmit
=
streamQ
ueue
NextItem
(
&
pHandle
->
pushHandle
.
inputQ
);
if
(
pSubmit
==
NULL
)
break
;
}
// 3. exec, after each success, update processed ver
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
edb9da84
...
...
@@ -307,7 +307,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
STqHandle
*
pExec
=
(
STqHandle
*
)
pIter
;
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
execHandle
.
exec
.
exec
Col
.
task
[
i
],
tbUidList
,
isAdd
);
int32_t
code
=
qUpdateQualifiedTableId
(
pExec
->
execHandle
.
execCol
.
task
[
i
],
tbUidList
,
isAdd
);
ASSERT
(
code
==
0
);
}
}
else
if
(
pExec
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
...
...
@@ -315,7 +315,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t
sz
=
taosArrayGetSize
(
tbUidList
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pExec
->
execHandle
.
exec
.
exec
Db
.
pFilterOutTbUid
,
&
tbUid
,
sizeof
(
int64_t
),
NULL
,
0
);
taosHashPut
(
pExec
->
execHandle
.
execDb
.
pFilterOutTbUid
,
&
tbUid
,
sizeof
(
int64_t
),
NULL
,
0
);
}
}
}
else
{
...
...
source/libs/stream/inc/
t
streamInc.h
→
source/libs/stream/inc/streamInc.h
浏览文件 @
edb9da84
...
...
@@ -13,15 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_INC_H_
#define _STREAM_INC_H_
#include "executor.h"
#include "tstream.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
int32_t
streamExec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamSink1
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
#ifdef __cplusplus
}
#endif
#endif
/* ifndef _
TSTREAM
_H_ */
#endif
/* ifndef _
STREAM_INC
_H_ */
source/libs/stream/src/stream.c
0 → 100644
浏览文件 @
edb9da84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
int32_t
streamTriggerByWrite
(
SStreamTask
*
pTask
,
int32_t
vgId
,
SMsgCb
*
pMsgCb
)
{
int8_t
execStatus
=
atomic_load_8
(
&
pTask
->
status
);
if
(
execStatus
==
TASK_STATUS__IDLE
||
execStatus
==
TASK_STATUS__CLOSING
)
{
SStreamTaskRunReq
*
pRunReq
=
rpcMallocCont
(
sizeof
(
SStreamTaskRunReq
));
if
(
pRunReq
==
NULL
)
return
-
1
;
// TODO: do we need htonl?
pRunReq
->
head
.
vgId
=
vgId
;
pRunReq
->
streamId
=
pTask
->
streamId
;
pRunReq
->
taskId
=
pTask
->
taskId
;
SRpcMsg
msg
=
{
.
msgType
=
TDMT_VND_TASK_RUN
,
.
pCont
=
pRunReq
,
.
contLen
=
sizeof
(
SStreamTaskRunReq
),
};
tmsgPutToQueue
(
pMsgCb
,
FETCH_QUEUE
,
&
msg
);
}
return
0
;
}
#if 1
int32_t
streamTaskEnqueue
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
SStreamDataBlock
*
pBlock
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
int8_t
status
;
// enqueue
if
(
pBlock
!=
NULL
)
{
pBlock
->
type
=
STREAM_DATA_TYPE_SSDATA_BLOCK
;
pBlock
->
sourceVg
=
pReq
->
sourceVg
;
pBlock
->
blocks
=
pReq
->
data
;
/*pBlock->sourceVer = pReq->sourceVer;*/
if
(
streamTaskInput
(
pTask
,
(
SStreamQueueItem
*
)
pBlock
)
==
0
)
{
status
=
TASK_INPUT_STATUS__NORMAL
;
}
else
{
status
=
TASK_INPUT_STATUS__FAILED
;
}
}
else
{
streamTaskInputFail
(
pTask
);
status
=
TASK_INPUT_STATUS__FAILED
;
}
// rsp by input status
SStreamDispatchRsp
*
pCont
=
rpcMallocCont
(
sizeof
(
SStreamDispatchRsp
));
pCont
->
inputStatus
=
status
;
pCont
->
streamId
=
pReq
->
streamId
;
pCont
->
taskId
=
pReq
->
sourceTaskId
;
pRsp
->
pCont
=
pCont
;
pRsp
->
contLen
=
sizeof
(
SStreamDispatchRsp
);
tmsgSendRsp
(
pRsp
);
return
status
==
TASK_INPUT_STATUS__NORMAL
?
0
:
-
1
;
}
#endif
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
// 1. handle input
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
// 2. try exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
streamExec
(
pTask
,
pMsgCb
);
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
streamSink1
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
}
// continue dispatch
streamSink1
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
streamExec
(
pTask
,
pMsgCb
);
streamSink1
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
)
{
//
return
0
;
}
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
)
{
//
return
0
;
}
source/libs/stream/src/streamData.c
0 → 100644
浏览文件 @
edb9da84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
#if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pOutput->type);
tlen += taosEncodeFixedI32(buf, pOutput->sourceVg);
tlen += taosEncodeFixedI64(buf, pOutput->sourceVer);
ASSERT(pOutput->type == STREAM_INPUT__DATA_BLOCK);
tlen += tEncodeDataBlocks(buf, pOutput->blocks);
return tlen;
}
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
buf = taosDecodeFixedI8(buf, &pInput->type);
buf = taosDecodeFixedI32(buf, &pInput->sourceVg);
buf = taosDecodeFixedI64(buf, &pInput->sourceVer);
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
buf = tDecodeDataBlocks(buf, &pInput->blocks);
return (void*)buf;
}
#endif
SStreamDataSubmit
*
streamDataSubmitNew
(
SSubmitReq
*
pReq
)
{
SStreamDataSubmit
*
pDataSubmit
=
(
SStreamDataSubmit
*
)
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pDataSubmit
==
NULL
)
return
NULL
;
pDataSubmit
->
dataRef
=
(
int32_t
*
)
taosMemoryMalloc
(
sizeof
(
int32_t
));
if
(
pDataSubmit
->
dataRef
==
NULL
)
goto
FAIL
;
pDataSubmit
->
data
=
pReq
;
*
pDataSubmit
->
dataRef
=
1
;
pDataSubmit
->
type
=
STREAM_INPUT__DATA_SUBMIT
;
return
pDataSubmit
;
FAIL:
taosFreeQitem
(
pDataSubmit
);
return
NULL
;
}
static
FORCE_INLINE
void
streamDataSubmitRefInc
(
SStreamDataSubmit
*
pDataSubmit
)
{
//
atomic_add_fetch_32
(
pDataSubmit
->
dataRef
,
1
);
}
SStreamDataSubmit
*
streamSubmitRefClone
(
SStreamDataSubmit
*
pSubmit
)
{
SStreamDataSubmit
*
pSubmitClone
=
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pSubmitClone
==
NULL
)
{
return
NULL
;
}
streamDataSubmitRefInc
(
pSubmit
);
memcpy
(
pSubmitClone
,
pSubmit
,
sizeof
(
SStreamDataSubmit
));
return
pSubmitClone
;
}
#if 0
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->streamId);
tlen += taosEncodeFixedI32(buf, pReq->taskId);
tlen += tEncodeDataBlocks(buf, pReq->data);
return tlen;
}
void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->streamId);
buf = taosDecodeFixedI32(buf, &pReq->taskId);
buf = tDecodeDataBlocks(buf, &pReq->data);
return (void*)buf;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); }
#endif
source/libs/stream/src/streamExec.c
0 → 100644
浏览文件 @
edb9da84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "tstream.h"
static
int32_t
streamTaskExecImpl
(
SStreamTask
*
pTask
,
void
*
data
,
SArray
*
pRes
)
{
void
*
exec
=
pTask
->
exec
.
executor
;
// set input
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
ASSERT
(
pSubmit
->
type
==
STREAM_INPUT__DATA_SUBMIT
);
qSetStreamInput
(
exec
,
pSubmit
->
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
,
false
);
}
else
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_BLOCK
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
ASSERT
(
pBlock
->
type
==
STREAM_INPUT__DATA_BLOCK
);
SArray
*
blocks
=
pBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
}
// exec
while
(
1
)
{
SSDataBlock
*
output
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
break
;
// TODO: do we need free memory?
SSDataBlock
*
outputCopy
=
createOneDataBlock
(
output
,
true
);
taosArrayPush
(
pRes
,
outputCopy
);
}
return
0
;
}
static
SArray
*
streamExecForQall
(
SStreamTask
*
pTask
,
SArray
*
pRes
)
{
while
(
1
)
{
void
*
data
=
streamQueueNextItem
(
pTask
->
inputQueue
);
if
(
data
==
NULL
)
break
;
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SStreamDataBlock
*
qRes
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
if
(
qRes
==
NULL
)
{
streamQueueProcessFail
(
pTask
->
inputQueue
);
taosArrayDestroy
(
pRes
);
return
NULL
;
}
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
blocks
=
pRes
;
if
(
streamTaskOutput
(
pTask
,
qRes
)
<
0
)
{
streamQueueProcessFail
(
pTask
->
inputQueue
);
taosArrayDestroy
(
pRes
);
taosFreeQitem
(
qRes
);
return
NULL
;
}
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
streamDataSubmitRefDec
((
SStreamDataSubmit
*
)
data
);
taosFreeQitem
(
data
);
}
else
{
taosArrayDestroyEx
(((
SStreamDataBlock
*
)
data
)
->
blocks
,
(
FDelete
)
tDeleteSSDataBlock
);
taosFreeQitem
(
data
);
}
streamQueueProcessSuccess
(
pTask
->
inputQueue
);
return
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
}
}
return
pRes
;
}
// TODO: handle version
int32_t
streamExec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
return
-
1
;
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
// first run
pRes
=
streamExecForQall
(
pTask
,
pRes
);
if
(
pRes
==
NULL
)
goto
FAIL
;
// set status closing
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__CLOSING
);
// second run, make sure inputQ and qall are cleared
pRes
=
streamExecForQall
(
pTask
,
pRes
);
if
(
pRes
==
NULL
)
goto
FAIL
;
break
;
}
else
if
(
execStatus
==
TASK_STATUS__CLOSING
)
{
continue
;
}
else
if
(
execStatus
==
TASK_STATUS__EXECUTING
)
{
break
;
}
else
{
ASSERT
(
0
);
}
}
if
(
pRes
)
taosArrayDestroy
(
pRes
);
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
return
0
;
FAIL:
if
(
pRes
)
taosArrayDestroy
(
pRes
);
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
return
-
1
;
}
source/libs/stream/src/streamMsg.c
0 → 100644
浏览文件 @
edb9da84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
int32_t
tEncodeStreamDispatchReq
(
SEncoder
*
pEncoder
,
const
SStreamDispatchReq
*
pReq
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceVg
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
blockNum
)
<
0
)
return
-
1
;
ASSERT
(
taosArrayGetSize
(
pReq
->
data
)
==
pReq
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pReq
->
dataLen
)
==
pReq
->
blockNum
);
for
(
int32_t
i
=
0
;
i
<
pReq
->
blockNum
;
i
++
)
{
int32_t
len
=
*
(
int32_t
*
)
taosArrayGet
(
pReq
->
dataLen
,
i
);
void
*
data
=
taosArrayGetP
(
pReq
->
data
,
i
);
if
(
tEncodeI32
(
pEncoder
,
len
)
<
0
)
return
-
1
;
if
(
tEncodeBinary
(
pEncoder
,
data
,
len
)
<
0
)
return
-
1
;
}
tEndEncode
(
pEncoder
);
return
0
;
}
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceVg
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
blockNum
)
<
0
)
return
-
1
;
ASSERT
(
pReq
->
blockNum
>
0
);
pReq
->
data
=
taosArrayInit
(
pReq
->
blockNum
,
sizeof
(
void
*
));
pReq
->
dataLen
=
taosArrayInit
(
pReq
->
blockNum
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
pReq
->
blockNum
;
i
++
)
{
int32_t
len1
;
uint64_t
len2
;
void
*
data
;
if
(
tDecodeI32
(
pDecoder
,
&
len1
)
<
0
)
return
-
1
;
if
(
tDecodeBinaryAlloc
(
pDecoder
,
&
data
,
&
len2
)
<
0
)
return
-
1
;
ASSERT
(
len1
==
len2
);
taosArrayPush
(
pReq
->
dataLen
,
&
len1
);
taosArrayPush
(
pReq
->
data
,
&
data
);
}
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
streamBuildDispatchMsg
(
SStreamTask
*
pTask
,
SArray
*
data
,
SRpcMsg
*
pMsg
,
SEpSet
**
ppEpSet
)
{
SStreamDispatchReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
data
=
data
,
};
return
0
;
}
static
int32_t
streamBuildExecMsg
(
SStreamTask
*
pTask
,
SArray
*
data
,
SRpcMsg
*
pMsg
,
SEpSet
**
ppEpSet
)
{
SStreamTaskExecReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
data
=
data
,
};
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
tEncodeSStreamTaskExecReq
(
NULL
,
&
req
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
((
SMsgHead
*
)
buf
)
->
vgId
=
0
;
req
.
taskId
=
pTask
->
inplaceDispatcher
.
taskId
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pTask
->
fixedEpDispatcher
.
nodeId
);
*
ppEpSet
=
&
pTask
->
fixedEpDispatcher
.
epSet
;
req
.
taskId
=
pTask
->
fixedEpDispatcher
.
taskId
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
// TODO use general name rule of schemaless
char
ctbName
[
TSDB_TABLE_FNAME_LEN
+
22
]
=
{
0
};
// all groupId must be the same in an array
SSDataBlock
*
pBlock
=
taosArrayGet
(
data
,
0
);
sprintf
(
ctbName
,
"%s:%ld"
,
pTask
->
shuffleDispatcher
.
stbFullName
,
pBlock
->
info
.
groupId
);
// TODO: get hash function by hashMethod
// get groupId, compute hash value
uint32_t
hashValue
=
MurmurHash3_32
(
ctbName
,
strlen
(
ctbName
));
// get node
// TODO: optimize search process
SArray
*
vgInfo
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
sz
=
taosArrayGetSize
(
vgInfo
);
int32_t
nodeId
=
0
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
vgInfo
,
i
);
if
(
hashValue
>=
pVgInfo
->
hashBegin
&&
hashValue
<=
pVgInfo
->
hashEnd
)
{
nodeId
=
pVgInfo
->
vgId
;
req
.
taskId
=
pVgInfo
->
taskId
;
*
ppEpSet
=
&
pVgInfo
->
epSet
;
break
;
}
}
ASSERT
(
nodeId
!=
0
);
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
nodeId
);
}
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSStreamTaskExecReq
(
&
abuf
,
&
req
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
msgType
=
pTask
->
dispatchMsgType
;
pMsg
->
info
.
noResp
=
1
;
return
0
;
}
static
int32_t
streamShuffleDispatch
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SHashObj
*
data
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
data
,
pIter
);
if
(
pIter
==
NULL
)
return
0
;
SArray
*
pData
=
*
(
SArray
**
)
pIter
;
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
;
if
(
streamBuildExecMsg
(
pTask
,
pData
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
}
return
0
;
}
source/libs/stream/src/streamQueue.c
0 → 100644
浏览文件 @
edb9da84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
SStreamQueue
*
streamQueueOpen
()
{
SStreamQueue
*
pQueue
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamQueue
));
if
(
pQueue
==
NULL
)
return
NULL
;
pQueue
->
queue
=
taosOpenQueue
();
pQueue
->
qall
=
taosAllocateQall
();
if
(
pQueue
->
queue
==
NULL
||
pQueue
->
qall
==
NULL
)
{
goto
FAIL
;
}
pQueue
->
status
=
STREAM_QUEUE__SUCESS
;
return
pQueue
;
FAIL:
if
(
pQueue
->
queue
)
taosCloseQueue
(
pQueue
->
queue
);
if
(
pQueue
->
qall
)
taosFreeQall
(
pQueue
->
qall
);
taosMemoryFree
(
pQueue
);
return
NULL
;
}
void
streamQueueClose
(
SStreamQueue
*
queue
)
{
while
(
1
)
{
void
*
qItem
=
streamQueueNextItem
(
queue
);
if
(
qItem
)
taosFreeQitem
(
qItem
);
else
return
;
}
}
source/libs/stream/src/streamSink.c
0 → 100644
浏览文件 @
edb9da84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "tstream.h"
int32_t
streamSink1
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
SStreamQueue
*
queue
;
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
{
queue
=
pTask
->
inputQueue
;
}
else
{
queue
=
pTask
->
outputQueue
;
}
/*if (streamDequeueBegin(queue) == true) {*/
/*return -1;*/
/*}*/
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
||
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
while
(
1
)
{
SStreamDataBlock
*
pBlock
=
streamQueueNextItem
(
queue
);
if
(
pBlock
==
NULL
)
break
;
ASSERT
(
pBlock
->
type
==
STREAM_DATA_TYPE_SSDATA_BLOCK
);
// local sink
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pBlock
->
blocks
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pBlock
->
blocks
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
ASSERT
(
queue
==
pTask
->
outputQueue
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
ASSERT
(
queue
==
pTask
->
outputQueue
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
ASSERT
(
queue
==
pTask
->
outputQueue
);
}
streamQueueProcessSuccess
(
queue
);
}
}
return
0
;
}
#if 0
int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
bool firstRun = 1;
while (1) {
SStreamDataBlock* pBlock = NULL;
if (!firstRun) {
taosReadAllQitems(pTask->outputQ, pTask->outputQAll);
}
taosGetQitem(pTask->outputQAll, (void**)&pBlock);
if (pBlock == NULL) {
if (firstRun) {
firstRun = 0;
continue;
} else {
break;
}
}
SArray* pRes = pBlock->blocks;
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
// blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
// dispatch
// TODO dispatch guard
int8_t outputStatus = atomic_load_8(&pTask->outputStatus);
if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) {
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
ASSERT(0);
return -1;
}
int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
qType = FETCH_QUEUE;
/*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/
/*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/
/*qType = MERGE_QUEUE;*/
/*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/
/*qType = WRITE_QUEUE;*/
} else {
ASSERT(0);
}
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (pShuffleRes == NULL) {
return -1;
}
int32_t sz = taosArrayGetSize(pRes);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
if (pArray == NULL) {
pArray = taosArrayInit(0, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
}
taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
}
taosArrayPush(pArray, pDataBlock);
}
if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
return -1;
}
} else {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
}
}
}
return 0;
}
#endif
source/libs/stream/src/streamTask.c
0 → 100644
浏览文件 @
edb9da84
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "tstream.h"
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
,
int32_t
childId
)
{
SStreamTask
*
pTask
=
(
SStreamTask
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
NULL
;
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
childId
=
childId
;
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
return
pTask
;
}
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if
(
tEncodeI64
(
pEncoder
,
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
inputType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
childId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tEncodeI64
(
pEncoder
,
pTask
->
tbSink
.
stbUid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
tbSink
.
stbFullName
)
<
0
)
return
-
1
;
if
(
tEncodeSSchemaWrapper
(
pEncoder
,
pTask
->
tbSink
.
pSchemaWrapper
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tEncodeI64
(
pEncoder
,
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
inplaceDispatcher
.
taskId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tSerializeSUseDbRspImp
(
pEncoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
}
/*tEndEncode(pEncoder);*/
return
pEncoder
->
pos
;
}
int32_t
tDecodeSStreamTask
(
SDecoder
*
pDecoder
,
SStreamTask
*
pTask
)
{
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
inputType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
childId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
tbSink
.
stbUid
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pTask
->
tbSink
.
stbFullName
)
<
0
)
return
-
1
;
pTask
->
tbSink
.
pSchemaWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pTask
->
tbSink
.
pSchemaWrapper
==
NULL
)
return
-
1
;
if
(
tDecodeSSchemaWrapper
(
pDecoder
,
pTask
->
tbSink
.
pSchemaWrapper
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
inplaceDispatcher
.
taskId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
if
(
tDeserializeSUseDbRspImp
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
}
/*tEndDecode(pDecoder);*/
return
0
;
}
void
tFreeSStreamTask
(
SStreamTask
*
pTask
)
{
streamQueueClose
(
pTask
->
inputQueue
);
streamQueueClose
(
pTask
->
outputQueue
);
if
(
pTask
->
exec
.
qmsg
)
taosMemoryFree
(
pTask
->
exec
.
qmsg
);
qDestroyTask
(
pTask
->
exec
.
executor
);
taosMemoryFree
(
pTask
);
}
source/libs/stream/src/
t
streamUpdate.c
→
source/libs/stream/src/streamUpdate.c
浏览文件 @
edb9da84
文件已移动
source/libs/stream/src/tstream.c
已删除
100644 → 0
浏览文件 @
6de91932
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
#include "executor.h"
int32_t
streamDataBlockEncode
(
void
**
buf
,
const
SStreamDataBlock
*
pOutput
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pOutput
->
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pOutput
->
sourceVg
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pOutput
->
sourceVer
);
ASSERT
(
pOutput
->
type
==
STREAM_INPUT__DATA_BLOCK
);
tlen
+=
tEncodeDataBlocks
(
buf
,
pOutput
->
blocks
);
return
tlen
;
}
void
*
streamDataBlockDecode
(
const
void
*
buf
,
SStreamDataBlock
*
pInput
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pInput
->
type
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pInput
->
sourceVg
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pInput
->
sourceVer
);
ASSERT
(
pInput
->
type
==
STREAM_INPUT__DATA_BLOCK
);
buf
=
tDecodeDataBlocks
(
buf
,
&
pInput
->
blocks
);
return
(
void
*
)
buf
;
}
SStreamDataSubmit
*
streamSubmitRefClone
(
SStreamDataSubmit
*
pSubmit
)
{
SStreamDataSubmit
*
pSubmitClone
=
taosAllocateQitem
(
sizeof
(
SStreamDataSubmit
),
DEF_QITEM
);
if
(
pSubmitClone
==
NULL
)
{
return
NULL
;
}
streamDataSubmitRefInc
(
pSubmit
);
memcpy
(
pSubmitClone
,
pSubmit
,
sizeof
(
SStreamDataSubmit
));
return
pSubmitClone
;
}
static
int32_t
streamBuildDispatchMsg
(
SStreamTask
*
pTask
,
SArray
*
data
,
SRpcMsg
*
pMsg
,
SEpSet
**
ppEpSet
)
{
SStreamDispatchReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
data
=
data
,
};
return
0
;
}
static
int32_t
streamBuildExecMsg
(
SStreamTask
*
pTask
,
SArray
*
data
,
SRpcMsg
*
pMsg
,
SEpSet
**
ppEpSet
)
{
SStreamTaskExecReq
req
=
{
.
streamId
=
pTask
->
streamId
,
.
data
=
data
,
};
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
tEncodeSStreamTaskExecReq
(
NULL
,
&
req
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
((
SMsgHead
*
)
buf
)
->
vgId
=
0
;
req
.
taskId
=
pTask
->
inplaceDispatcher
.
taskId
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pTask
->
fixedEpDispatcher
.
nodeId
);
*
ppEpSet
=
&
pTask
->
fixedEpDispatcher
.
epSet
;
req
.
taskId
=
pTask
->
fixedEpDispatcher
.
taskId
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
// TODO use general name rule of schemaless
char
ctbName
[
TSDB_TABLE_FNAME_LEN
+
22
]
=
{
0
};
// all groupId must be the same in an array
SSDataBlock
*
pBlock
=
taosArrayGet
(
data
,
0
);
sprintf
(
ctbName
,
"%s:%ld"
,
pTask
->
shuffleDispatcher
.
stbFullName
,
pBlock
->
info
.
groupId
);
// TODO: get hash function by hashMethod
// get groupId, compute hash value
uint32_t
hashValue
=
MurmurHash3_32
(
ctbName
,
strlen
(
ctbName
));
// get node
// TODO: optimize search process
SArray
*
vgInfo
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
sz
=
taosArrayGetSize
(
vgInfo
);
int32_t
nodeId
=
0
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
vgInfo
,
i
);
if
(
hashValue
>=
pVgInfo
->
hashBegin
&&
hashValue
<=
pVgInfo
->
hashEnd
)
{
nodeId
=
pVgInfo
->
vgId
;
req
.
taskId
=
pVgInfo
->
taskId
;
*
ppEpSet
=
&
pVgInfo
->
epSet
;
break
;
}
}
ASSERT
(
nodeId
!=
0
);
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
nodeId
);
}
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSStreamTaskExecReq
(
&
abuf
,
&
req
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
pMsg
->
msgType
=
pTask
->
dispatchMsgType
;
pMsg
->
info
.
noResp
=
1
;
return
0
;
}
static
int32_t
streamShuffleDispatch
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SHashObj
*
data
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
data
,
pIter
);
if
(
pIter
==
NULL
)
return
0
;
SArray
*
pData
=
*
(
SArray
**
)
pIter
;
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
;
if
(
streamBuildExecMsg
(
pTask
,
pData
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
}
return
0
;
}
int32_t
streamEnqueueDataSubmit
(
SStreamTask
*
pTask
,
SStreamDataSubmit
*
input
)
{
ASSERT
(
pTask
->
inputType
==
TASK_INPUT_TYPE__SUMBIT_BLOCK
);
int8_t
inputStatus
=
atomic_load_8
(
&
pTask
->
inputStatus
);
if
(
inputStatus
==
TASK_INPUT_STATUS__NORMAL
)
{
streamDataSubmitRefInc
(
input
);
taosWriteQitem
(
pTask
->
inputQ
,
input
);
}
return
inputStatus
;
}
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
)
{
ASSERT
(
pTask
->
inputType
==
TASK_INPUT_TYPE__DATA_BLOCK
);
taosWriteQitem
(
pTask
->
inputQ
,
input
);
int8_t
inputStatus
=
atomic_load_8
(
&
pTask
->
inputStatus
);
return
inputStatus
;
}
static
int32_t
streamTaskExecImpl
(
SStreamTask
*
pTask
,
void
*
data
,
SArray
*
pRes
)
{
void
*
exec
=
pTask
->
exec
.
executor
;
// set input
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
ASSERT
(
pSubmit
->
type
==
STREAM_INPUT__DATA_SUBMIT
);
qSetStreamInput
(
exec
,
pSubmit
->
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
,
false
);
}
else
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_BLOCK
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
ASSERT
(
pBlock
->
type
==
STREAM_INPUT__DATA_BLOCK
);
SArray
*
blocks
=
pBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
false
);
}
// exec
while
(
1
)
{
SSDataBlock
*
output
=
NULL
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
break
;
// TODO: do we need free memory?
SSDataBlock
*
outputCopy
=
createOneDataBlock
(
output
,
true
);
taosArrayPush
(
pRes
,
outputCopy
);
}
// destroy
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
streamDataSubmitRefDec
((
SStreamDataSubmit
*
)
data
);
taosFreeQitem
(
data
);
}
else
{
taosArrayDestroyEx
(((
SStreamDataBlock
*
)
data
)
->
blocks
,
(
FDelete
)
tDeleteSSDataBlock
);
taosFreeQitem
(
data
);
}
return
0
;
}
static
SArray
*
streamExecForQall
(
SStreamTask
*
pTask
,
SArray
*
pRes
)
{
while
(
1
)
{
void
*
data
=
NULL
;
taosGetQitem
(
pTask
->
inputQAll
,
&
data
);
if
(
data
==
NULL
)
break
;
streamTaskExecImpl
(
pTask
,
data
,
pRes
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SStreamDataBlock
*
qRes
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
qRes
->
type
=
STREAM_INPUT__DATA_BLOCK
;
qRes
->
blocks
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
qRes
);
return
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
}
}
return
pRes
;
}
// TODO: handle version
int32_t
streamExec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
return
-
1
;
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
// first run, from qall, handle failure from last exec
pRes
=
streamExecForQall
(
pTask
,
pRes
);
if
(
pRes
==
NULL
)
goto
FAIL
;
// second run, from inputQ
taosReadAllQitems
(
pTask
->
inputQ
,
pTask
->
inputQAll
);
pRes
=
streamExecForQall
(
pTask
,
pRes
);
if
(
pRes
==
NULL
)
goto
FAIL
;
// set status closing
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__CLOSING
);
// third run, make sure inputQ and qall are cleared
taosReadAllQitems
(
pTask
->
inputQ
,
pTask
->
inputQAll
);
pRes
=
streamExecForQall
(
pTask
,
pRes
);
if
(
pRes
==
NULL
)
goto
FAIL
;
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
break
;
}
else
if
(
execStatus
==
TASK_STATUS__CLOSING
)
{
continue
;
}
else
if
(
execStatus
==
TASK_STATUS__EXECUTING
)
{
break
;
}
else
{
ASSERT
(
0
);
}
}
return
0
;
FAIL:
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
);
return
-
1
;
}
int32_t
streamSink
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
bool
firstRun
=
1
;
while
(
1
)
{
SStreamDataBlock
*
pBlock
=
NULL
;
if
(
!
firstRun
)
{
taosReadAllQitems
(
pTask
->
outputQ
,
pTask
->
outputQAll
);
}
taosGetQitem
(
pTask
->
outputQAll
,
(
void
**
)
&
pBlock
);
if
(
pBlock
==
NULL
)
{
if
(
firstRun
)
{
firstRun
=
0
;
continue
;
}
else
{
break
;
}
}
SArray
*
pRes
=
pBlock
->
blocks
;
// sink
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
// blockDebugShowData(pRes);
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pRes
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
//
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
//
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
// dispatch
// TODO dispatch guard
int8_t
outputStatus
=
atomic_load_8
(
&
pTask
->
outputStatus
);
if
(
outputStatus
==
TASK_OUTPUT_STATUS__NORMAL
)
{
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_DISPATCH
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_DISPATCH
)
{
qType
=
FETCH_QUEUE
;
/*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/
/*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/
/*qType = MERGE_QUEUE;*/
/*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/
/*qType = WRITE_QUEUE;*/
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pArray
==
NULL
)
{
return
-
1
;
}
taosHashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
));
}
taosArrayPush
(
pArray
,
pDataBlock
);
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
}
}
return
0
;
}
int32_t
streamTaskEnqueue
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
SStreamDataBlock
*
pBlock
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
int8_t
status
;
// 1.1 update status
// TODO cal backpressure
if
(
pBlock
==
NULL
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
status
=
TASK_INPUT_STATUS__FAILED
;
}
else
{
status
=
atomic_load_8
(
&
pTask
->
inputStatus
);
}
// 1.2 enqueue
pBlock
->
type
=
STREAM_DATA_TYPE_SSDATA_BLOCK
;
pBlock
->
sourceVg
=
pReq
->
sourceVg
;
/*pBlock->sourceVer = pReq->sourceVer;*/
taosWriteQitem
(
pTask
->
inputQ
,
pBlock
);
// 1.3 rsp by input status
SStreamDispatchRsp
*
pCont
=
rpcMallocCont
(
sizeof
(
SStreamDispatchRsp
));
pCont
->
inputStatus
=
status
;
pCont
->
streamId
=
pReq
->
streamId
;
pCont
->
taskId
=
pReq
->
sourceTaskId
;
pRsp
->
pCont
=
pCont
;
pRsp
->
contLen
=
sizeof
(
SStreamDispatchRsp
);
tmsgSendRsp
(
pRsp
);
return
0
;
}
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
// 1. handle input
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
// 2. try exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
streamExec
(
pTask
,
pMsgCb
);
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
streamSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
pRsp
->
inputStatus
);
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
}
// continue dispatch
streamSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
streamExec
(
pTask
,
pMsgCb
);
streamSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
)
{
//
return
0
;
}
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
)
{
//
return
0
;
}
int32_t
tEncodeStreamDispatchReq
(
SEncoder
*
pEncoder
,
const
SStreamDispatchReq
*
pReq
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pReq
->
sourceVg
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
return
0
;
}
int32_t
tDecodeStreamDispatchReq
(
SDecoder
*
pDecoder
,
SStreamDispatchReq
*
pReq
)
{
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pReq
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceTaskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
sourceVg
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
return
0
;
}
int32_t
tEncodeSStreamTaskExecReq
(
void
**
buf
,
const
SStreamTaskExecReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
streamId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
taskId
);
tlen
+=
tEncodeDataBlocks
(
buf
,
pReq
->
data
);
return
tlen
;
}
void
*
tDecodeSStreamTaskExecReq
(
const
void
*
buf
,
SStreamTaskExecReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
streamId
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
taskId
);
buf
=
tDecodeDataBlocks
(
buf
,
&
pReq
->
data
);
return
(
void
*
)
buf
;
}
void
tFreeSStreamTaskExecReq
(
SStreamTaskExecReq
*
pReq
)
{
taosArrayDestroy
(
pReq
->
data
);
}
SStreamTask
*
tNewSStreamTask
(
int64_t
streamId
)
{
SStreamTask
*
pTask
=
(
SStreamTask
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
NULL
;
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
status
=
TASK_STATUS__IDLE
;
return
pTask
;
}
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if
(
tEncodeI64
(
pEncoder
,
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
inputType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tEncodeI64
(
pEncoder
,
pTask
->
tbSink
.
stbUid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
tbSink
.
stbFullName
)
<
0
)
return
-
1
;
if
(
tEncodeSSchemaWrapper
(
pEncoder
,
pTask
->
tbSink
.
pSchemaWrapper
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tEncodeI64
(
pEncoder
,
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tEncodeI8
(
pEncoder
,
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
inplaceDispatcher
.
taskId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
if
(
tSerializeSUseDbRspImp
(
pEncoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
/*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
}
/*tEndEncode(pEncoder);*/
return
pEncoder
->
pos
;
}
int32_t
tDecodeSStreamTask
(
SDecoder
*
pDecoder
,
SStreamTask
*
pTask
)
{
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
streamId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
inputType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sourceType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
execType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
sinkType
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
dispatchType
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pTask
->
dispatchMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
exec
.
parallelizable
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
exec
.
qmsg
)
<
0
)
return
-
1
;
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
tbSink
.
stbUid
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pTask
->
tbSink
.
stbFullName
)
<
0
)
return
-
1
;
pTask
->
tbSink
.
pSchemaWrapper
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pTask
->
tbSink
.
pSchemaWrapper
==
NULL
)
return
-
1
;
if
(
tDecodeSSchemaWrapper
(
pDecoder
,
pTask
->
tbSink
.
pSchemaWrapper
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pTask
->
smaSink
.
smaId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
fetchSink
.
reserved
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
inplaceDispatcher
.
taskId
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
fixedEpDispatcher
.
epSet
)
<
0
)
return
-
1
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
/*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/
if
(
tDeserializeSUseDbRspImp
(
pDecoder
,
&
pTask
->
shuffleDispatcher
.
dbInfo
)
<
0
)
return
-
1
;
}
/*tEndDecode(pDecoder);*/
return
0
;
}
void
tFreeSStreamTask
(
SStreamTask
*
pTask
)
{
taosCloseQueue
(
pTask
->
inputQ
);
taosCloseQueue
(
pTask
->
outputQ
);
// TODO
if
(
pTask
->
exec
.
qmsg
)
taosMemoryFree
(
pTask
->
exec
.
qmsg
);
qDestroyTask
(
pTask
->
exec
.
executor
);
taosMemoryFree
(
pTask
);
}
#if 0
int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
/*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/
return pEncoder->size;
}
int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) {
return pEncoder->size;
}
void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) {
taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock);
}
#endif
source/util/src/tqueue.c
浏览文件 @
edb9da84
...
...
@@ -34,11 +34,11 @@ typedef struct STaosQnode {
}
STaosQnode
;
typedef
struct
STaosQueue
{
STaosQnode
*
head
;
STaosQnode
*
tail
;
STaosQueue
*
next
;
// for queue set
STaosQset
*
qset
;
// for queue set
void
*
ahandle
;
// for queue set
STaosQnode
*
head
;
STaosQnode
*
tail
;
STaosQueue
*
next
;
// for queue set
STaosQset
*
qset
;
// for queue set
void
*
ahandle
;
// for queue set
FItem
itemFp
;
FItems
itemsFp
;
TdThreadMutex
mutex
;
...
...
@@ -47,8 +47,8 @@ typedef struct STaosQueue {
}
STaosQueue
;
typedef
struct
STaosQset
{
STaosQueue
*
head
;
STaosQueue
*
current
;
STaosQueue
*
head
;
STaosQueue
*
current
;
TdThreadMutex
mutex
;
tsem_t
sem
;
int32_t
numOfQueues
;
...
...
@@ -86,7 +86,7 @@ void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
void
taosCloseQueue
(
STaosQueue
*
queue
)
{
if
(
queue
==
NULL
)
return
;
STaosQnode
*
pTemp
;
STaosQset
*
qset
;
STaosQset
*
qset
;
taosThreadMutexLock
(
&
queue
->
mutex
);
STaosQnode
*
pNode
=
queue
->
head
;
...
...
@@ -282,6 +282,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
*
ppItem
=
pNode
->
item
;
num
=
1
;
uTrace
(
"item:%p is fetched"
,
*
ppItem
);
}
else
{
*
ppItem
=
NULL
;
}
return
num
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录