未验证 提交 931a763e 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13578 from taosdata/feature/stream

enh(stream): direct sink if not dispatch
......@@ -311,7 +311,16 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
}
static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) {
taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
taosWriteQitem(pTask->outputQueue->queue, pBlock);
}
return 0;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "function.h"
#include "functionMgt.h"
......
......@@ -24,10 +24,13 @@ extern "C" {
#endif
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
// int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet);
int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb);
#ifdef __cplusplus
}
......
......@@ -83,7 +83,10 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
streamSink1(pTask, pMsgCb);
/*streamSink1(pTask, pMsgCb);*/
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatchAll(pTask, pMsgCb);
}
return 0;
}
......@@ -97,13 +100,19 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
return 0;
}
// continue dispatch
streamSink1(pTask, pMsgCb);
/*streamSink1(pTask, pMsgCb);*/
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatchAll(pTask, pMsgCb);
}
return 0;
}
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
streamExec(pTask, pMsgCb);
streamSink1(pTask, pMsgCb);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatchAll(pTask, pMsgCb);
}
/*streamSink1(pTask, pMsgCb);*/
return 0;
}
......
......@@ -174,52 +174,6 @@ FAIL:
return code;
}
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
#if 0
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) {
return 0;
}
#endif
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
ASSERT(0);
return -1;
}
int32_t qType;
if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {
qType = FETCH_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {
qType = WRITE_QUEUE;
} else {
ASSERT(0);
}
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
}
return 0;
}
#if 0
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = {
......
......@@ -15,6 +15,21 @@
#include "streamInc.h"
int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
while (1) {
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) break;
ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK);
streamDispatch(pTask, pMsgCb, pBlock);
/*streamQueueProcessSuccess(pTask->outputQueue);*/
}
return 0;
}
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
SStreamQueue* queue;
if (pTask->execType == TASK_EXEC__NONE) {
......@@ -58,6 +73,57 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
return 0;
}
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
#if 1
int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) {
return 0;
}
#endif
ASSERT(pTask->dispatchType != TASK_DISPATCH__INPLACE);
/*if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {*/
/*SRpcMsg dispatchMsg = {0};*/
/*if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {*/
/*ASSERT(0);*/
/*return -1;*/
/*}*/
/*int32_t qType;*/
/*if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {*/
/*qType = FETCH_QUEUE;*/
/*} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {*/
/*qType = WRITE_QUEUE;*/
/*} else {*/
/*ASSERT(0);*/
/*}*/
/*tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);*/
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
tmsgSendReq(pEpSet, &dispatchMsg);
}
return 0;
}
#if 0
int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
bool firstRun = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册