Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a393dee9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a393dee9
编写于
5月 23, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(stream)
上级
403b2ba1
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
74 addition
and
622 deletion
+74
-622
cmake/cmake.define
cmake/cmake.define
+2
-2
example/src/tstream.c
example/src/tstream.c
+1
-3
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+6
-10
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+1
-3
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-2
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-5
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+48
-418
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+0
-18
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+14
-161
未找到文件。
cmake/cmake.define
浏览文件 @
a393dee9
...
...
@@ -71,8 +71,8 @@ ELSE ()
ENDIF ()
IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -
static-libasan -
g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -
static-libasan -
g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
...
...
example/src/tstream.c
浏览文件 @
a393dee9
...
...
@@ -82,9 +82,7 @@ int32_t create_stream() {
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pConn
,
"create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"from tu1 interval(10m)"
);
pConn
,
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/libs/stream/tstream.h
浏览文件 @
a393dee9
...
...
@@ -115,16 +115,14 @@ int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void
*
streamDataBlockDecode
(
const
void
*
buf
,
SStreamDataBlock
*
pInput
);
typedef
struct
{
void
*
inputHandle
;
void
*
executor
;
}
SStreamRunner
;
typedef
struct
{
int8_t
parallelizable
;
char
*
qmsg
;
// followings are not applicable to encoder and decoder
int8_t
numOfRunners
;
SStreamRunner
*
runners
;
void
*
inputHandle
;
void
*
executor
;
}
STaskExec
;
typedef
struct
{
...
...
@@ -320,17 +318,15 @@ int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
);
int32_t
streamDequeueOutput
(
SStreamTask
*
pTask
,
void
**
output
);
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
);
int32_t
streamTaskRun
(
SStreamTask
*
pTask
);
int32_t
streamTaskHandleInput
(
SStreamTask
*
pTask
,
void
*
data
);
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
stream
Task
ProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
stream
Task
ProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
);
int32_t
stream
Task
ProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
stream
Task
ProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
);
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
);
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
);
#ifdef __cplusplus
}
...
...
source/dnode/snode/src/snode.c
浏览文件 @
a393dee9
...
...
@@ -57,9 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) {
}
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
for
(
int
i
=
0
;
i
<
pTask
->
exec
.
numOfRunners
;
i
++
)
{
pTask
->
exec
.
runners
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
NULL
);
}
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
NULL
);
return
taosHashPut
(
pMeta
->
pHash
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
void
*
));
}
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
a393dee9
...
...
@@ -173,8 +173,7 @@ typedef struct {
}
STqExec
;
struct
STQ
{
char
*
path
;
// STqMetaStore* tqMeta;
char
*
path
;
SHashObj
*
pushMgr
;
// consumerId -> STqExec*
SHashObj
*
execs
;
// subKey -> STqExec
SHashObj
*
pStreamTasks
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
a393dee9
...
...
@@ -123,11 +123,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
#if 0
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
#endif
int32_t
tqProcessStreamTriggerNew
(
STQ
*
pTq
,
SSubmitReq
*
data
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
SSubmitReq
*
data
);
int32_t
tqProcessTaskRunReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskDispatchReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecoverReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a393dee9
...
...
@@ -36,15 +36,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
/*ASSERT(0);*/
/*}*/
#if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) {
taosMemoryFree(pTq);
return NULL;
}
#endif
pTq
->
execs
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_ENTRY_LOCK
);
pTq
->
pStreamTasks
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
...
...
@@ -65,48 +56,6 @@ void tqClose(STQ* pTq) {
// TODO
}
static
void
tdSRowDemo
()
{
#define DEMO_N_COLS 3
int16_t
schemaVersion
=
0
;
int32_t
numOfCols
=
DEMO_N_COLS
;
// ts + int
SRowBuilder
rb
=
{
0
};
SSchema
schema
[
DEMO_N_COLS
]
=
{
{.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
colId
=
1
,
.
name
=
"ts"
,
.
bytes
=
8
,
.
flags
=
COL_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
2
,
.
name
=
"c1"
,
.
bytes
=
4
,
.
flags
=
COL_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
3
,
.
name
=
"c2"
,
.
bytes
=
4
,
.
flags
=
COL_SMA_ON
}};
SSchema
*
pSchema
=
schema
;
STSchema
*
pTSChema
=
tdGetSTSChemaFromSSChema
(
&
pSchema
,
numOfCols
);
tdSRowInit
(
&
rb
,
schemaVersion
);
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
pTSChema
->
flen
);
int32_t
maxLen
=
TD_ROW_MAX_BYTES_FROM_SCHEMA
(
pTSChema
);
void
*
row
=
taosMemoryCalloc
(
1
,
maxLen
);
// make sure the buffer is enough
// set row buf
tdSRowResetBuf
(
&
rb
,
row
);
for
(
int32_t
idx
=
0
;
idx
<
pTSChema
->
numOfCols
;
++
idx
)
{
STColumn
*
pColumn
=
pTSChema
->
columns
+
idx
;
if
(
idx
==
0
)
{
int64_t
tsKey
=
1651234567
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
tsKey
,
true
,
pColumn
->
offset
,
idx
);
}
else
if
(
idx
==
1
)
{
int32_t
val1
=
10
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
val1
,
true
,
pColumn
->
offset
,
idx
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NONE
,
NULL
,
true
,
pColumn
->
offset
,
idx
);
}
}
// print
tdSRowPrint
(
row
,
pTSChema
,
__func__
);
taosMemoryFree
(
pTSChema
);
}
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
...
...
@@ -261,32 +210,20 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
// make sure msgType == TDMT_VND_SUBMIT
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
return
-
1
;
}
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
// TODO error handle
}
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
return
-
1
;
}
memcpy
(
data
,
msg
,
msgLen
);
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
return
-
1
;
tqProcessStreamTrigger
(
pTq
,
data
);
}
memcpy
(
data
,
msg
,
msgLen
);
tqProcessStreamTriggerNew
(
pTq
,
data
);
#if 0
SRpcMsg req = {
.msgType = TDMT_VND_STREAM_TRIGGER,
.pCont = data,
.contLen = msgLen,
};
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
#endif
return
0
;
}
...
...
@@ -685,213 +622,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return
0
;
}
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
fetchOffset = pReq->currentOffset + 1;
}
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
SMqPollRspV2 rspV2 = {0};
rspV2.dataLen = 0;
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
}
STqTopic* pTopic = NULL;
int32_t topicSz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < topicSz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
ASSERT(pConsumer->consumerId == consumerId);
if (strcmp(topic->topicName, pReq->topic) == 0) {
pTopic = topic;
break;
}
}
if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
tqDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
TD_VID(pTq->pVnode));
rspV2.reqOffset = pReq->currentOffset;
rspV2.skipLogNum = 0;
while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) {
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break;
}
SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
break;
}
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
break;
}
taosArrayPush(pRes, pDataBlock);
}
if (taosArrayGetSize(pRes) == 0) {
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
fetchOffset++;
rspV2.skipLogNum++;
taosArrayDestroy(pRes);
continue;
}
rspV2.rspOffset = fetchOffset;
int32_t blockSz = taosArrayGetSize(pRes);
int32_t dataBlockStrLen = 0;
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pBlock = taosArrayGet(pRes, i);
dataBlockStrLen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
}
void* dataBlockBuf = taosMemoryMalloc(dataBlockStrLen);
if (dataBlockBuf == NULL) {
pMsg->code = -1;
taosMemoryFree(pHead);
}
rspV2.blockData = dataBlockBuf;
int32_t pos;
rspV2.blockPos = taosArrayInit(blockSz, sizeof(int32_t));
for (int32_t i = 0; i < blockSz; i++) {
pos = 0;
SSDataBlock* pBlock = taosArrayGet(pRes, i);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)dataBlockBuf;
pRetrieve->useconds = 0;
pRetrieve->precision = 0;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
blockCompressEncode(pBlock, pRetrieve->data, &pos, pBlock->info.numOfCols, false);
taosArrayPush(rspV2.blockPos, &rspV2.dataLen);
int32_t totLen = sizeof(SRetrieveTableRsp) + pos;
pRetrieve->compLen = htonl(totLen);
rspV2.dataLen += totLen;
dataBlockBuf = POINTER_SHIFT(dataBlockBuf, totLen);
}
ASSERT(POINTER_DISTANCE(dataBlockBuf, rspV2.blockData) <= dataBlockStrLen);
int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(msgLen);
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* msgBodyBuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&msgBodyBuf, &rspV2);
/*rsp.pBlockData = pRes;*/
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0};
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(&resp);
taosMemoryFree(pHead);
return 0;
} else {
taosMemoryFree(pHead);
fetchOffset++;
rspV2.skipLogNum++;
}
}
/*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
rspV2.rspOffset = fetchOffset - 1;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&abuf, &rspV2);
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch);
/*}*/
return 0;
}
#endif
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SMqVDeleteReq
*
pReq
=
(
SMqVDeleteReq
*
)
msg
;
...
...
@@ -981,7 +711,18 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
==
0
);
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int32_t
parallel
)
{
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
-
1
;
}
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
msg
,
msgLen
);
if
(
tDecodeSStreamTask
(
&
decoder
,
pTask
)
<
0
)
{
ASSERT
(
0
);
}
tDecoderClear
(
&
decoder
);
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
...
...
@@ -994,57 +735,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
||
pTask
->
inputQAll
==
NULL
||
pTask
->
outputQAll
==
NULL
)
goto
FAIL
;
// exec
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
// expand runners
pTask
->
exec
.
numOfRunners
=
parallel
;
pTask
->
exec
.
runners
=
taosMemoryCalloc
(
parallel
,
sizeof
(
SStreamRunner
));
if
(
pTask
->
exec
.
runners
==
NULL
)
{
goto
FAIL
;
}
for
(
int32_t
i
=
0
;
i
<
parallel
;
i
++
)
{
STqReadHandle
*
pStreamReader
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
};
pTask
->
exec
.
runners
[
i
].
inputHandle
=
pStreamReader
;
pTask
->
exec
.
runners
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
ASSERT
(
pTask
->
exec
.
runners
[
i
].
executor
);
}
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
tbSinkFunc
=
tqTableSink
;
}
return
0
;
FAIL:
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
-
1
;
}
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
-
1
;
}
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
msg
,
msgLen
);
if
(
tDecodeSStreamTask
(
&
decoder
,
pTask
)
<
0
)
{
ASSERT
(
0
);
}
tDecoderClear
(
&
decoder
);
// exec
if
(
tqExpandTask
(
pTq
,
pTask
,
4
)
<
0
)
{
ASSERT
(
0
);
STqReadHandle
*
pStreamReader
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
};
pTask
->
exec
.
inputHandle
=
pStreamReader
;
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
ASSERT
(
pTask
->
exec
.
executor
);
}
// sink
...
...
@@ -1052,8 +755,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
=
smaHandleRes
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
tbSinkFunc
=
tqTableSink
;
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
pTask
->
tbSink
.
pTSchema
=
tdGetSTSChemaFromSSChema
(
&
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
...
...
@@ -1061,94 +768,17 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
return
0
;
}
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
streamExecTask
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
,
workerId
)
<
0
)
{
// TODO
}
}
return
0
;
}
#if 0
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) {
SStreamDataSubmit* pSubmit = NULL;
// build data
pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pSubmit == NULL) return -1;
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
if (pSubmit->dataRef == NULL) goto FAIL;
*pSubmit->dataRef = 1;
pSubmit->data = data;
pSubmit->type = STREAM_INPUT__DATA_BLOCK;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK) {
streamEnqueueDataSubmit(pTask, pSubmit);
// TODO cal back pressure
}
// check run
int8_t execStatus = atomic_load_8(&pTask->status);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
SStreamTaskRunReq* pReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
if (pReq == NULL) continue;
// TODO: do we need htonl?
pReq->head.vgId = pTq->pVnode->config.vgId;
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
SRpcMsg msg = {
.msgType = 0,
.pCont = pReq,
.contLen = sizeof(SStreamTaskRunReq),
};
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
}
}
streamDataSubmitRefDec(pSubmit);
return
0
;
FAIL:
if (pSubmit) {
if (pSubmit->dataRef) {
taosMemoryFree(pSubmit->dataRef);
}
taosFreeQitem(pSubmit);
}
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
-
1
;
}
#endif
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
)
{
SStreamTaskExecReq
req
;
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
ASSERT
(
taskId
);
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
ASSERT
(
pTask
);
if
(
streamExecTask
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
req
.
data
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
workerId
)
<
0
)
{
// TODO
}
return
0
;
}
int32_t
tqProcessStreamTrigger
New
(
STQ
*
pTq
,
SSubmitReq
*
pReq
)
{
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
SSubmitReq
*
pReq
)
{
void
*
pIter
=
NULL
;
bool
failed
=
false
;
...
...
@@ -1234,7 +864,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessDispatchReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
streamProcessDispatchReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
return
0
;
}
...
...
@@ -1242,7 +872,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessRecoverReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
streamProcessRecoverReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
return
0
;
}
...
...
@@ -1250,7 +880,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessDispatchRsp
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pRsp
);
streamProcessDispatchRsp
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pRsp
);
return
0
;
}
...
...
@@ -1258,6 +888,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessRecoverRsp
(
pTask
,
pRsp
);
streamProcessRecoverRsp
(
pTask
,
pRsp
);
return
0
;
}
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
a393dee9
...
...
@@ -106,13 +106,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
break
;
#if 0
case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
0) < 0) {
}
} break;
#endif
case
TDMT_VND_ALTER_VNODE
:
break
;
default:
...
...
@@ -195,17 +188,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case
TDMT_VND_TASK_RECOVER_RSP
:
return
tqProcessTaskRecoverRsp
(
pVnode
->
pTq
,
pMsg
);
#if 0
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
case TDMT_VND_STREAM_TRIGGER:{
// refactor, avoid double free
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
pMsg->pCont = NULL;
return code;
}
#endif
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
default:
...
...
source/libs/stream/src/tstream.c
浏览文件 @
a393dee9
...
...
@@ -134,7 +134,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
}
static
int32_t
streamTaskExecImpl
(
SStreamTask
*
pTask
,
void
*
data
,
SArray
*
pRes
)
{
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
void
*
exec
=
pTask
->
exec
.
executor
;
// set input
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
...
...
@@ -171,12 +171,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
}
// TODO: handle version
int32_t
stream
TaskExec2
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
int32_t
stream
Exec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
return
-
1
;
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
void
*
exec
=
pTask
->
exec
.
executor
;
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
// first run, from qall, handle failure from last exec
while
(
1
)
{
...
...
@@ -278,7 +278,7 @@ FAIL:
return
-
1
;
}
int32_t
stream
Task
Sink
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
int32_t
streamSink
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
bool
firstRun
=
1
;
while
(
1
)
{
SStreamDataBlock
*
pBlock
=
NULL
;
...
...
@@ -407,7 +407,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
return
0
;
}
int32_t
stream
Task
ProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
// 1. handle input
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
...
...
@@ -415,172 +415,42 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
stream
TaskExec2
(
pTask
,
pMsgCb
);
stream
Exec
(
pTask
,
pMsgCb
);
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
stream
Task
Sink
(
pTask
,
pMsgCb
);
streamSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
stream
Task
ProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
pRsp
->
inputStatus
);
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
}
// continue dispatch
stream
Task
Sink
(
pTask
,
pMsgCb
);
streamSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
stream
TaskExec2
(
pTask
,
pMsgCb
);
stream
Task
Sink
(
pTask
,
pMsgCb
);
stream
Exec
(
pTask
,
pMsgCb
);
streamSink
(
pTask
,
pMsgCb
);
return
0
;
}
int32_t
stream
Task
ProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
)
{
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
)
{
//
return
0
;
}
int32_t
stream
Task
ProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
)
{
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
)
{
//
return
0
;
}
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
)
{
SArray
*
pRes
=
NULL
;
// source
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
&&
pTask
->
sourceType
!=
TASK_SOURCE__SCAN
)
return
0
;
// exec
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
ASSERT
(
workId
<
pTask
->
exec
.
numOfRunners
);
void
*
exec
=
pTask
->
exec
.
runners
[
workId
].
executor
;
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
{
return
-
1
;
}
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
qSetStreamInput
(
exec
,
input
,
inputType
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
output
);
}
}
else
if
(
inputType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
const
SArray
*
blocks
=
(
const
SArray
*
)
input
;
/*int32_t sz = taosArrayGetSize(blocks);*/
/*for (int32_t i = 0; i < sz; i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/
/*qSetStreamInput(exec, pBlock, inputType);*/
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
output
);
}
/*}*/
}
else
{
ASSERT
(
0
);
}
}
else
{
ASSERT
(
inputType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
);
pRes
=
(
SArray
*
)
input
;
}
if
(
pRes
==
NULL
||
taosArrayGetSize
(
pRes
)
==
0
)
return
0
;
// sink
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
// blockDebugShowData(pRes);
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pRes
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
//
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
//
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
// dispatch
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_MERGE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_MERGE_EXEC
)
{
qType
=
MERGE_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_WRITE_EXEC
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pArray
==
NULL
)
{
return
-
1
;
}
taosHashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
));
}
taosArrayPush
(
pArray
,
pDataBlock
);
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
return
0
;
}
int32_t
tEncodeSStreamTaskExecReq
(
void
**
buf
,
const
SStreamTaskExecReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
streamId
);
...
...
@@ -607,20 +477,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
pTask
->
streamId
=
streamId
;
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputQ
=
taosOpenQueue
();
pTask
->
outputQ
=
taosOpenQueue
();
pTask
->
inputQAll
=
taosAllocateQall
();
pTask
->
outputQAll
=
taosAllocateQall
();
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
||
pTask
->
inputQAll
==
NULL
||
pTask
->
outputQAll
==
NULL
)
goto
FAIL
;
return
pTask
;
FAIL:
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
NULL
;
}
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
...
...
@@ -722,11 +579,7 @@ void tFreeSStreamTask(SStreamTask* pTask) {
taosCloseQueue
(
pTask
->
outputQ
);
// TODO
if
(
pTask
->
exec
.
qmsg
)
taosMemoryFree
(
pTask
->
exec
.
qmsg
);
for
(
int32_t
i
=
0
;
i
<
pTask
->
exec
.
numOfRunners
;
i
++
)
{
qDestroyTask
(
pTask
->
exec
.
runners
[
i
].
executor
);
}
taosMemoryFree
(
pTask
->
exec
.
runners
);
/*taosMemoryFree(pTask->executor);*/
qDestroyTask
(
pTask
->
exec
.
executor
);
taosMemoryFree
(
pTask
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录