Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8b6820ff
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
8b6820ff
编写于
5月 15, 2022
作者:
L
Liu Jicong
提交者:
GitHub
5月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12489 from taosdata/feature/stream
refactor(stream)
上级
1e385277
a6015d2f
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
578 addition
and
200 deletion
+578
-200
include/common/tmsg.h
include/common/tmsg.h
+8
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+91
-8
include/os/osAtomic.h
include/os/osAtomic.h
+45
-45
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+141
-141
source/dnode/mnode/impl/test/trans/trans2.cpp
source/dnode/mnode/impl/test/trans/trans2.cpp
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+34
-0
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+257
-4
tests/script/tsim/tstream/basic1.sim
tests/script/tsim/tstream/basic1.sim
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
8b6820ff
...
...
@@ -2574,6 +2574,14 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx
(
pRsp
->
topics
,
(
void
(
*
)(
void
*
))
tDeleteSMqSubTopicEp
);
}
typedef
struct
{
void
*
data
;
}
SStreamDispatchReq
;
typedef
struct
{
int8_t
status
;
}
SStreamDispatchRsp
;
#define TD_AUTO_CREATE_TABLE 0x1
typedef
struct
{
int64_t
suid
;
...
...
include/libs/stream/tstream.h
浏览文件 @
8b6820ff
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
...
...
@@ -29,8 +30,22 @@ extern "C" {
typedef
struct
SStreamTask
SStreamTask
;
enum
{
STREAM_TASK_STATUS__RUNNING
=
1
,
STREAM_TASK_STATUS__STOP
,
TASK_STATUS__IDLE
=
1
,
TASK_STATUS__EXECUTING
,
TASK_STATUS__CLOSING
,
};
enum
{
TASK_INPUT_STATUS__NORMAL
=
1
,
TASK_INPUT_STATUS__BLOCKED
,
TASK_INPUT_STATUS__RECOVER
,
TASK_INPUT_STATUS__STOP
,
};
enum
{
TASK_OUTPUT_STATUS__NORMAL
=
1
,
TASK_OUTPUT_STATUS__WAIT
,
TASK_OUTPUT_STATUS__BLOCKED
,
};
enum
{
...
...
@@ -38,10 +53,64 @@ enum {
STREAM_CREATED_BY__SMA
,
};
enum
{
STREAM_INPUT__DATA_SUBMIT
=
1
,
STREAM_INPUT__DATA_BLOCK
,
STREAM_INPUT__CHECKPOINT
,
};
typedef
struct
{
int32_t
nodeId
;
// 0 for snode
SEpSet
epSet
;
}
SStreamTaskEp
;
int8_t
type
;
int32_t
sourceVg
;
int64_t
sourceVer
;
int32_t
*
dataRef
;
SSubmitReq
*
data
;
}
SStreamDataSubmit
;
typedef
struct
{
int8_t
type
;
int32_t
sourceVg
;
int64_t
sourceVer
;
SArray
*
blocks
;
// SArray<SSDataBlock*>
}
SStreamDataBlock
;
typedef
struct
{
int8_t
type
;
}
SStreamCheckpoint
;
static
FORCE_INLINE
SStreamDataSubmit
*
streamDataSubmitNew
(
SSubmitReq
*
pReq
)
{
SStreamDataSubmit
*
pDataSubmit
=
(
SStreamDataSubmit
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SStreamDataSubmit
));
if
(
pDataSubmit
==
NULL
)
return
NULL
;
pDataSubmit
->
data
=
pReq
;
pDataSubmit
->
dataRef
=
(
int32_t
*
)
taosMemoryMalloc
(
sizeof
(
int32_t
));
if
(
pDataSubmit
->
data
==
NULL
)
goto
FAIL
;
*
pDataSubmit
->
dataRef
=
1
;
return
pDataSubmit
;
FAIL:
taosMemoryFree
(
pDataSubmit
);
return
NULL
;
}
static
FORCE_INLINE
void
streamDataSubmitRefInc
(
SStreamDataSubmit
*
pDataSubmit
)
{
//
atomic_add_fetch_32
(
pDataSubmit
->
dataRef
,
1
);
}
static
FORCE_INLINE
void
streamDataSubmitRefDec
(
SStreamDataSubmit
*
pDataSubmit
)
{
int32_t
ref
=
atomic_sub_fetch_32
(
pDataSubmit
->
dataRef
,
1
);
ASSERT
(
ref
>=
0
);
if
(
ref
==
0
)
{
taosMemoryFree
(
pDataSubmit
->
data
);
taosMemoryFree
(
pDataSubmit
->
dataRef
);
}
}
int32_t
streamDataBlockEncode
(
void
**
buf
,
const
SStreamDataBlock
*
pOutput
);
void
*
streamDataBlockDecode
(
const
void
*
buf
,
SStreamDataBlock
*
pInput
);
typedef
struct
{
void
*
inputHandle
;
...
...
@@ -122,9 +191,15 @@ enum {
TASK_SINK__FETCH
,
};
enum
{
TASK_INPUT_TYPE__SUMBIT_BLOCK
=
1
,
TASK_INPUT_TYPE__DATA_BLOCK
,
};
struct
SStreamTask
{
int64_t
streamId
;
int32_t
taskId
;
int8_t
inputType
;
int8_t
status
;
int8_t
sourceType
;
...
...
@@ -155,9 +230,11 @@ struct SStreamTask {
STaskDispatcherShuffle
shuffleDispatcher
;
};
// msg buffer
int32_t
memUsed
;
int8_t
inputStatus
;
int8_t
outputStatus
;
STaosQueue
*
inputQ
;
STaosQueue
*
outputQ
;
// application storage
void
*
ahandle
;
...
...
@@ -199,10 +276,16 @@ typedef struct {
SArray
*
res
;
// SArray<SSDataBlock>
}
SStreamSinkReq
;
int32_t
streamEnqueueData
(
SStreamTask
*
pTask
,
const
void
*
input
,
int32_t
inputType
);
int32_t
streamEnqueueDataSubmit
(
SStreamTask
*
pTask
,
SStreamDataSubmit
*
input
);
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
);
int32_t
streamDequeueOutput
(
SStreamTask
*
pTask
,
void
**
output
);
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
);
int32_t
streamTaskExecNew
(
SStreamTask
*
pTask
);
int32_t
streamTaskHandleInput
(
SStreamTask
*
pTask
,
void
*
data
);
#ifdef __cplusplus
}
#endif
...
...
include/os/osAtomic.h
浏览文件 @
8b6820ff
...
...
@@ -23,27 +23,27 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following section.
#ifndef ALLOW_FORBID_FUNC
#define __atomic_load_n
__ATOMIC_LOAD_N_FUNC_TAOS_FORBID
#define __atomic_store_n
__ATOMIC_STORE_N_FUNC_TAOS_FORBID
#define __atomic_exchange_n
__ATOMIC_EXCHANGE_N_FUNC_TAOS_FORBID
#define __sync_val_compare_and_swap __SYNC_VAL_COMPARE_AND_SWAP_FUNC_TAOS_FORBID
#define __atomic_add_fetch
__ATOMIC_ADD_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_add
__ATOMIC_FETCH_ADD_FUNC_TAOS_FORBID
#define __atomic_sub_fetch
__ATOMIC_SUB_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_sub
__ATOMIC_FETCH_SUB_FUNC_TAOS_FORBID
#define __atomic_and_fetch
__ATOMIC_AND_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_and
__ATOMIC_FETCH_AND_FUNC_TAOS_FORBID
#define __atomic_or_fetch
__ATOMIC_OR_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_or
__ATOMIC_FETCH_OR_FUNC_TAOS_FORBID
#define __atomic_xor_fetch
__ATOMIC_XOR_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_xor
__ATOMIC_FETCH_XOR_FUNC_TAOS_FORBID
#define __atomic_load_n
__ATOMIC_LOAD_N_FUNC_TAOS_FORBID
#define __atomic_store_n
__ATOMIC_STORE_N_FUNC_TAOS_FORBID
#define __atomic_exchange_n
__ATOMIC_EXCHANGE_N_FUNC_TAOS_FORBID
#define __sync_val_compare_and_swap __SYNC_VAL_COMPARE_AND_SWAP_FUNC_TAOS_FORBID
#define __atomic_add_fetch
__ATOMIC_ADD_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_add
__ATOMIC_FETCH_ADD_FUNC_TAOS_FORBID
#define __atomic_sub_fetch
__ATOMIC_SUB_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_sub
__ATOMIC_FETCH_SUB_FUNC_TAOS_FORBID
#define __atomic_and_fetch
__ATOMIC_AND_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_and
__ATOMIC_FETCH_AND_FUNC_TAOS_FORBID
#define __atomic_or_fetch
__ATOMIC_OR_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_or
__ATOMIC_FETCH_OR_FUNC_TAOS_FORBID
#define __atomic_xor_fetch
__ATOMIC_XOR_FETCH_FUNC_TAOS_FORBID
#define __atomic_fetch_xor
__ATOMIC_FETCH_XOR_FUNC_TAOS_FORBID
#endif
int8_t
atomic_load_8
(
int8_t
volatile
*
ptr
);
int16_t
atomic_load_16
(
int16_t
volatile
*
ptr
);
int32_t
atomic_load_32
(
int32_t
volatile
*
ptr
);
int64_t
atomic_load_64
(
int64_t
volatile
*
ptr
);
void
*
atomic_load_ptr
(
void
*
ptr
);
void
*
atomic_load_ptr
(
void
*
ptr
);
void
atomic_store_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
void
atomic_store_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
void
atomic_store_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
...
...
@@ -53,62 +53,62 @@ int8_t atomic_exchange_8(int8_t volatile *ptr, int8_t val);
int16_t
atomic_exchange_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_exchange_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_exchange_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_exchange_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_exchange_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_val_compare_exchange_8
(
int8_t
volatile
*
ptr
,
int8_t
oldval
,
int8_t
newval
);
int16_t
atomic_val_compare_exchange_16
(
int16_t
volatile
*
ptr
,
int16_t
oldval
,
int16_t
newval
);
int32_t
atomic_val_compare_exchange_32
(
int32_t
volatile
*
ptr
,
int32_t
oldval
,
int32_t
newval
);
int64_t
atomic_val_compare_exchange_64
(
int64_t
volatile
*
ptr
,
int64_t
oldval
,
int64_t
newval
);
void
*
atomic_val_compare_exchange_ptr
(
void
*
ptr
,
void
*
oldval
,
void
*
newval
);
void
*
atomic_val_compare_exchange_ptr
(
void
*
ptr
,
void
*
oldval
,
void
*
newval
);
int8_t
atomic_add_fetch_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_add_fetch_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_add_fetch_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_add_fetch_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_add_fetch_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_add_fetch_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_fetch_add_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_fetch_add_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_fetch_add_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_fetch_add_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_fetch_add_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_fetch_add_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_sub_fetch_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_sub_fetch_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_sub_fetch_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_sub_fetch_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_sub_fetch_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_sub_fetch_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_fetch_sub_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_fetch_sub_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_fetch_sub_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_fetch_sub_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_fetch_sub_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_fetch_sub_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_and_fetch_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_and_fetch_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_and_fetch_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_and_fetch_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_and_fetch_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_and_fetch_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_fetch_and_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_fetch_and_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_fetch_and_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_fetch_and_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_fetch_and_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_fetch_and_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_or_fetch_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_or_fetch_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_or_fetch_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_or_fetch_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_or_fetch_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_or_fetch_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_fetch_or_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_fetch_or_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_fetch_or_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_fetch_or_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_fetch_or_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_fetch_or_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_xor_fetch_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_xor_fetch_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_xor_fetch_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_xor_fetch_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_xor_fetch_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_xor_fetch_ptr
(
void
*
ptr
,
void
*
val
);
int8_t
atomic_fetch_xor_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_fetch_xor_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_fetch_xor_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_fetch_xor_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_fetch_xor_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_fetch_xor_ptr
(
void
*
ptr
,
void
*
val
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
8b6820ff
...
...
@@ -17,6 +17,147 @@
#include "mndDef.h"
#include "mndConsumer.h"
int32_t
tEncodeSStreamObj
(
SEncoder
*
pEncoder
,
const
SStreamObj
*
pObj
)
{
int32_t
sz
=
0
;
/*int32_t outputNameSz = 0;*/
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sourceDb
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
targetDb
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
targetSTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
targetStbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
createTime
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
updateTime
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
dbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
waterMark
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sql
)
<
0
)
return
-
1
;
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
// TODO encode tasks
if
(
pObj
->
tasks
)
{
sz
=
taosArrayGetSize
(
pObj
->
tasks
);
}
if
(
tEncodeI32
(
pEncoder
,
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pArray
=
taosArrayGetP
(
pObj
->
tasks
,
i
);
int32_t
innerSz
=
taosArrayGetSize
(
pArray
);
if
(
tEncodeI32
(
pEncoder
,
innerSz
)
<
0
)
return
-
1
;
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pArray
,
j
);
if
(
tEncodeSStreamTask
(
pEncoder
,
pTask
)
<
0
)
return
-
1
;
}
}
if
(
tEncodeSSchemaWrapper
(
pEncoder
,
&
pObj
->
outputSchema
)
<
0
)
return
-
1
;
#if 0
if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->ColAlias);
}
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
for (int32_t i = 0; i < outputNameSz; i++) {
char *name = taosArrayGetP(pObj->ColAlias, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1;
}
#endif
return
pEncoder
->
pos
;
}
int32_t
tDecodeSStreamObj
(
SDecoder
*
pDecoder
,
SStreamObj
*
pObj
)
{
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
sourceDb
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
targetDb
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
targetSTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
targetStbUid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
createTime
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
updateTime
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
dbUid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
waterMark
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
sql
)
<
0
)
return
-
1
;
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
pObj
->
tasks
=
NULL
;
int32_t
sz
;
if
(
tDecodeI32
(
pDecoder
,
&
sz
)
<
0
)
return
-
1
;
if
(
sz
!=
0
)
{
pObj
->
tasks
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
innerSz
;
if
(
tDecodeI32
(
pDecoder
,
&
innerSz
)
<
0
)
return
-
1
;
SArray
*
pArray
=
taosArrayInit
(
innerSz
,
sizeof
(
void
*
));
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
return
-
1
;
if
(
tDecodeSStreamTask
(
pDecoder
,
pTask
)
<
0
)
return
-
1
;
taosArrayPush
(
pArray
,
&
pTask
);
}
taosArrayPush
(
pObj
->
tasks
,
&
pArray
);
}
}
if
(
tDecodeSSchemaWrapper
(
pDecoder
,
&
pObj
->
outputSchema
)
<
0
)
return
-
1
;
#if 0
int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
if (outputNameSz != 0) {
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->ColAlias == NULL) {
return -1;
}
}
for (int32_t i = 0; i < outputNameSz; i++) {
char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->ColAlias, &name);
}
#endif
return
0
;
}
SMqVgEp
*
tCloneSMqVgEp
(
const
SMqVgEp
*
pVgEp
)
{
SMqVgEp
*
pVgEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
if
(
pVgEpNew
==
NULL
)
return
NULL
;
pVgEpNew
->
vgId
=
pVgEp
->
vgId
;
pVgEpNew
->
qmsg
=
strdup
(
pVgEp
->
qmsg
);
pVgEpNew
->
epSet
=
pVgEp
->
epSet
;
return
pVgEpNew
;
}
void
tDeleteSMqVgEp
(
SMqVgEp
*
pVgEp
)
{
if
(
pVgEp
->
qmsg
)
taosMemoryFree
(
pVgEp
->
qmsg
);
}
int32_t
tEncodeSMqVgEp
(
void
**
buf
,
const
SMqVgEp
*
pVgEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pVgEp
->
vgId
);
tlen
+=
taosEncodeString
(
buf
,
pVgEp
->
qmsg
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
return
tlen
;
}
void
*
tDecodeSMqVgEp
(
const
void
*
buf
,
SMqVgEp
*
pVgEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pVgEp
->
vgId
);
buf
=
taosDecodeString
(
buf
,
&
pVgEp
->
qmsg
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
return
(
void
*
)
buf
;
}
SMqConsumerObj
*
tNewSMqConsumerObj
(
int64_t
consumerId
,
char
cgroup
[
TSDB_CGROUP_LEN
])
{
SMqConsumerObj
*
pConsumer
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
...
...
@@ -187,34 +328,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
return
(
void
*
)
buf
;
}
SMqVgEp
*
tCloneSMqVgEp
(
const
SMqVgEp
*
pVgEp
)
{
SMqVgEp
*
pVgEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
if
(
pVgEpNew
==
NULL
)
return
NULL
;
pVgEpNew
->
vgId
=
pVgEp
->
vgId
;
pVgEpNew
->
qmsg
=
strdup
(
pVgEp
->
qmsg
);
pVgEpNew
->
epSet
=
pVgEp
->
epSet
;
return
pVgEpNew
;
}
void
tDeleteSMqVgEp
(
SMqVgEp
*
pVgEp
)
{
if
(
pVgEp
->
qmsg
)
taosMemoryFree
(
pVgEp
->
qmsg
);
}
int32_t
tEncodeSMqVgEp
(
void
**
buf
,
const
SMqVgEp
*
pVgEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pVgEp
->
vgId
);
tlen
+=
taosEncodeString
(
buf
,
pVgEp
->
qmsg
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
return
tlen
;
}
void
*
tDecodeSMqVgEp
(
const
void
*
buf
,
SMqVgEp
*
pVgEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pVgEp
->
vgId
);
buf
=
taosDecodeString
(
buf
,
&
pVgEp
->
qmsg
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
return
(
void
*
)
buf
;
}
SMqConsumerEp
*
tCloneSMqConsumerEp
(
const
SMqConsumerEp
*
pConsumerEpOld
)
{
SMqConsumerEp
*
pConsumerEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqConsumerEp
));
if
(
pConsumerEpNew
==
NULL
)
return
NULL
;
...
...
@@ -413,119 +526,6 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
return
(
void
*
)
buf
;
}
int32_t
tEncodeSStreamObj
(
SEncoder
*
pEncoder
,
const
SStreamObj
*
pObj
)
{
int32_t
sz
=
0
;
/*int32_t outputNameSz = 0;*/
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sourceDb
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
targetDb
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
targetSTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
targetStbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
createTime
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
updateTime
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
dbUid
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
version
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
waterMark
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
sql
)
<
0
)
return
-
1
;
/*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
if
(
tEncodeCStr
(
pEncoder
,
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
// TODO encode tasks
if
(
pObj
->
tasks
)
{
sz
=
taosArrayGetSize
(
pObj
->
tasks
);
}
if
(
tEncodeI32
(
pEncoder
,
sz
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pArray
=
taosArrayGetP
(
pObj
->
tasks
,
i
);
int32_t
innerSz
=
taosArrayGetSize
(
pArray
);
if
(
tEncodeI32
(
pEncoder
,
innerSz
)
<
0
)
return
-
1
;
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pArray
,
j
);
if
(
tEncodeSStreamTask
(
pEncoder
,
pTask
)
<
0
)
return
-
1
;
}
}
if
(
tEncodeSSchemaWrapper
(
pEncoder
,
&
pObj
->
outputSchema
)
<
0
)
return
-
1
;
#if 0
if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->ColAlias);
}
if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
for (int32_t i = 0; i < outputNameSz; i++) {
char *name = taosArrayGetP(pObj->ColAlias, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1;
}
#endif
return
pEncoder
->
pos
;
}
int32_t
tDecodeSStreamObj
(
SDecoder
*
pDecoder
,
SStreamObj
*
pObj
)
{
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
sourceDb
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
targetDb
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pObj
->
targetSTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
targetStbUid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
createTime
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
updateTime
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
uid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
dbUid
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
createdBy
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pObj
->
trigger
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
triggerParam
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
waterMark
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pObj
->
fixedSinkVgId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pObj
->
smaId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
sql
)
<
0
)
return
-
1
;
/*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pObj
->
physicalPlan
)
<
0
)
return
-
1
;
pObj
->
tasks
=
NULL
;
int32_t
sz
;
if
(
tDecodeI32
(
pDecoder
,
&
sz
)
<
0
)
return
-
1
;
if
(
sz
!=
0
)
{
pObj
->
tasks
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
innerSz
;
if
(
tDecodeI32
(
pDecoder
,
&
innerSz
)
<
0
)
return
-
1
;
SArray
*
pArray
=
taosArrayInit
(
innerSz
,
sizeof
(
void
*
));
for
(
int32_t
j
=
0
;
j
<
innerSz
;
j
++
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
return
-
1
;
if
(
tDecodeSStreamTask
(
pDecoder
,
pTask
)
<
0
)
return
-
1
;
taosArrayPush
(
pArray
,
&
pTask
);
}
taosArrayPush
(
pObj
->
tasks
,
&
pArray
);
}
}
if
(
tDecodeSSchemaWrapper
(
pDecoder
,
&
pObj
->
outputSchema
)
<
0
)
return
-
1
;
#if 0
int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
if (outputNameSz != 0) {
pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
if (pObj->ColAlias == NULL) {
return -1;
}
}
for (int32_t i = 0; i < outputNameSz; i++) {
char *name;
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->ColAlias, &name);
}
#endif
return
0
;
}
int32_t
tEncodeSMqOffsetObj
(
void
**
buf
,
const
SMqOffsetObj
*
pOffset
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pOffset
->
key
);
...
...
source/dnode/mnode/impl/test/trans/trans2.cpp
浏览文件 @
8b6820ff
source/dnode/vnode/src/tq/tq.c
浏览文件 @
8b6820ff
...
...
@@ -1031,3 +1031,37 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId)
}
return
0
;
}
int32_t
tqProcessTaskExec2
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTaskExecReq
req
=
{
0
};
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
ASSERT
(
pTask
);
ASSERT
(
pTask
->
inputType
==
TASK_INPUT_TYPE__DATA_BLOCK
);
// enqueue
int32_t
inputStatus
=
streamEnqueueDataBlk
(
pTask
,
(
SStreamDataBlock
*
)
req
.
data
);
if
(
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO rsp blocked
return
0
;
}
// try exec
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
if
(
streamTaskExecNew
(
pTask
)
<
0
)
{
atomic_store_8
(
&
pTask
->
status
,
TASK_STATUS__CLOSING
);
goto
FAIL
;
}
}
else
if
(
execStatus
==
TASK_STATUS__EXECUTING
)
{
return
0
;
}
// TODO rsp success
return
0
;
FAIL:
return
-
1
;
}
source/libs/stream/src/tstream.c
浏览文件 @
8b6820ff
...
...
@@ -16,6 +16,25 @@
#include "tstream.h"
#include "executor.h"
int32_t
streamDataBlockEncode
(
void
**
buf
,
const
SStreamDataBlock
*
pOutput
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pOutput
->
type
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pOutput
->
sourceVg
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pOutput
->
sourceVer
);
ASSERT
(
pOutput
->
type
==
STREAM_INPUT__DATA_BLOCK
);
tlen
+=
tEncodeDataBlocks
(
buf
,
pOutput
->
blocks
);
return
tlen
;
}
void
*
streamDataBlockDecode
(
const
void
*
buf
,
SStreamDataBlock
*
pInput
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pInput
->
type
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pInput
->
sourceVg
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pInput
->
sourceVer
);
ASSERT
(
pInput
->
type
==
STREAM_INPUT__DATA_BLOCK
);
buf
=
tDecodeDataBlocks
(
buf
,
&
pInput
->
blocks
);
return
(
void
*
)
buf
;
}
static
int32_t
streamBuildDispatchMsg
(
SStreamTask
*
pTask
,
SArray
*
data
,
SRpcMsg
*
pMsg
,
SEpSet
**
ppEpSet
)
{
SStreamTaskExecReq
req
=
{
.
streamId
=
pTask
->
streamId
,
...
...
@@ -97,6 +116,226 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
return
0
;
}
int32_t
streamEnqueueDataSubmit
(
SStreamTask
*
pTask
,
SStreamDataSubmit
*
input
)
{
ASSERT
(
pTask
->
inputType
==
TASK_INPUT_TYPE__SUMBIT_BLOCK
);
int8_t
inputStatus
=
atomic_load_8
(
&
pTask
->
inputStatus
);
if
(
inputStatus
==
TASK_INPUT_STATUS__NORMAL
)
{
streamDataSubmitRefInc
(
input
);
taosWriteQitem
(
pTask
->
inputQ
,
input
);
}
return
inputStatus
;
}
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
)
{
ASSERT
(
pTask
->
inputType
==
TASK_INPUT_TYPE__DATA_BLOCK
);
taosWriteQitem
(
pTask
->
inputQ
,
input
);
int8_t
inputStatus
=
atomic_load_8
(
&
pTask
->
inputStatus
);
return
inputStatus
;
}
int32_t
streamTaskProcessTriggerReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
char
*
msg
,
int32_t
msgLen
)
{
//
return
0
;
}
int32_t
streamTaskProcessInputReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDataBlock
*
pBlock
,
SRpcMsg
*
pRsp
)
{
// 1. handle input
// 1.1 enqueue
taosWriteQitem
(
pTask
->
inputQ
,
pBlock
);
// 1.2 calc back pressure
// 1.3 rsp by input status
int8_t
inputStatus
=
atomic_load_8
(
&
pTask
->
inputStatus
);
SStreamDispatchRsp
*
pCont
=
rpcMallocCont
(
sizeof
(
SStreamDispatchRsp
));
pCont
->
status
=
inputStatus
;
pRsp
->
pCont
=
pCont
;
pRsp
->
contLen
=
sizeof
(
SStreamDispatchRsp
);
tmsgSendRsp
(
pRsp
);
// 2. try exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
const
SArray
*
blocks
=
pBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
break
;
taosArrayPush
(
pRes
,
&
output
);
}
// TODO: wrap destroy block
taosArrayDestroyP
(
pBlock
->
blocks
,
(
FDelete
)
blockDataDestroy
);
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SArray
**
resQ
=
taosAllocateQitem
(
sizeof
(
void
**
),
DEF_QITEM
);
*
resQ
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
resQ
);
}
}
else
if
(
execStatus
==
TASK_STATUS__CLOSING
)
{
continue
;
}
else
if
(
execStatus
==
TASK_STATUS__EXECUTING
)
break
;
else
{
ASSERT
(
0
);
}
}
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
STaosQall
*
qall
=
taosAllocateQall
();
taosReadAllQitems
(
pTask
->
outputQ
,
qall
);
SArray
**
ppRes
=
NULL
;
while
(
1
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
ppRes
);
if
(
ppRes
==
NULL
)
break
;
SArray
*
pRes
=
*
ppRes
;
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
pBlock
->
sourceVer
,
pRes
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
}
else
{
}
// dispatch
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildDispatchMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_MERGE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_MERGE_EXEC
)
{
qType
=
MERGE_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_WRITE_EXEC
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildDispatchMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tmsgSendReq
(
pMsgCb
,
pEpSet
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pArray
==
NULL
)
{
return
-
1
;
}
taosHashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
));
}
taosArrayPush
(
pArray
,
pDataBlock
);
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
}
//
return
0
;
}
int32_t
streamTaskProcessDispatchRsp
(
SStreamTask
*
pTask
,
char
*
msg
,
int32_t
msgLen
)
{
//
return
0
;
}
int32_t
streamTaskProcessRecoverReq
(
SStreamTask
*
pTask
,
char
*
msg
)
{
//
return
0
;
}
int32_t
streamTaskExecNew
(
SStreamTask
*
pTask
)
{
SArray
*
pRes
=
NULL
;
if
(
pTask
->
execType
==
TASK_EXEC__PIPE
||
pTask
->
execType
==
TASK_EXEC__MERGE
)
{
// TODO remove multi runner
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
int8_t
status
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
if
(
status
==
TASK_STATUS__IDLE
)
{
pRes
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
if
(
pRes
==
NULL
)
{
return
-
1
;
}
void
*
input
=
NULL
;
taosWriteQitem
(
pTask
->
inputQ
,
&
input
);
if
(
input
==
NULL
)
return
0
;
// TODO: fix type
if
(
pTask
->
sourceType
==
TASK_SOURCE__SCAN
)
{
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
input
;
qSetStreamInput
(
exec
,
pSubmit
->
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
break
;
taosArrayPush
(
pRes
,
&
output
);
}
streamDataSubmitRefDec
(
pSubmit
);
}
else
{
SStreamDataBlock
*
pStreamBlock
=
(
SStreamDataBlock
*
)
input
;
const
SArray
*
blocks
=
pStreamBlock
->
blocks
;
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
=
0
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
break
;
taosArrayPush
(
pRes
,
&
output
);
}
// TODO: wrap destroy block
taosArrayDestroyP
(
pStreamBlock
->
blocks
,
(
FDelete
)
blockDataDestroy
);
}
if
(
taosArrayGetSize
(
pRes
)
!=
0
)
{
SArray
**
resQ
=
taosAllocateQitem
(
sizeof
(
void
**
),
DEF_QITEM
);
*
resQ
=
pRes
;
taosWriteQitem
(
pTask
->
outputQ
,
resQ
);
}
}
}
return
0
;
}
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
)
{
SArray
*
pRes
=
NULL
;
// source
...
...
@@ -251,9 +490,17 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
}
pTask
->
taskId
=
tGenIdPI32
();
pTask
->
streamId
=
streamId
;
pTask
->
status
=
STREAM_TASK_STATUS__RUNNING
;
/*pTask->qmsg = NULL;*/
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputQ
=
taosOpenQueue
();
pTask
->
outputQ
=
taosOpenQueue
();
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
)
goto
FAIL
;
return
pTask
;
FAIL:
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
NULL
;
}
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
...
...
@@ -349,10 +596,16 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
void
tFreeSStreamTask
(
SStreamTask
*
pTask
)
{
taosCloseQueue
(
pTask
->
inputQ
);
taosCloseQueue
(
pTask
->
outputQ
);
// TODO
/*taosMemoryFree(pTask->qmsg);*/
if
(
pTask
->
exec
.
qmsg
)
taosMemoryFree
(
pTask
->
exec
.
qmsg
);
for
(
int32_t
i
=
0
;
i
<
pTask
->
exec
.
numOfRunners
;
i
++
)
{
qDestroyTask
(
pTask
->
exec
.
runners
[
i
].
executor
);
}
taosMemoryFree
(
pTask
->
exec
.
runners
);
/*taosMemoryFree(pTask->executor);*/
/*taosMemoryFree(pTask);*/
taosMemoryFree
(
pTask
);
}
#if 0
...
...
tests/script/tsim/tstream/basic1.sim
浏览文件 @
8b6820ff
...
...
@@ -136,7 +136,7 @@ if $data35 != 3 then
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep
1
00
sleep
5
00
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录