diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index af934f0437cba73d5a7711a594d03c64409bddc5..ddd7e1cd02b69c74469da66623372c535d873c5b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -131,14 +131,7 @@ static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) { SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq); -static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { - int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); - ASSERT(ref >= 0); - if (ref == 0) { - taosMemoryFree(pDataSubmit->data); - taosMemoryFree(pDataSubmit->dataRef); - } -} +void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit); SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit); @@ -189,6 +182,7 @@ typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef struct { int64_t smaId; // following are not applicable to encoder and decoder + void* vnode; FSmaSink* smaSink; } STaskSinkSma; @@ -270,7 +264,7 @@ struct SStreamTask { SStreamQueue* outputQueue; // application storage - void* ahandle; + // void* ahandle; }; SStreamTask* tNewSStreamTask(int64_t streamId, int32_t childId); @@ -316,7 +310,7 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); } else if (pTask->sinkType == TASK_SINK__SMA) { ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); - pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks); + pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); } else { ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); taosWriteQitem(pTask->outputQueue->queue, pBlock); @@ -328,26 +322,11 @@ typedef struct { int32_t reserved; } SStreamTaskDeployRsp; -typedef struct { - // SMsgHead head; - int64_t streamId; - int32_t taskId; - SArray* data; // SArray -} SStreamTaskExecReq; - typedef struct { // SMsgHead head; SStreamTask* task; } SStreamTaskDeployReq; -int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq); -void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq); -void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq); - -typedef struct { - int32_t reserved; -} SStreamTaskExecRsp; - typedef struct { SMsgHead head; int64_t streamId; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b5e18c1dbe71d9fdd5288f3db1d745926a49115..181cddee47bab68470147f3d79da4db38bf9cc7d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -349,8 +349,9 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { } // sink - pTask->ahandle = pTq->pVnode; + /*pTask->ahandle = pTq->pVnode;*/ if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.vnode = pTq->pVnode; pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->sinkType == TASK_SINK__TABLE) { pTask->tbSink.vnode = pTq->pVnode; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 76e228632d9da2f3dc0b0d2a74e5a6a74741677f..27746fff783182999b4a5780387fa48b04a564d9 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -24,14 +24,10 @@ extern "C" { #endif int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); -// int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); -int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data); - +int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); -int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb); - #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b865b503399c8e5856b3368675c41f6b945562be..823ce6d2fe68703b42f10e1036c25428b08939ec 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -83,9 +83,8 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp // 3. handle output // 3.1 check and set status // 3.2 dispatch / sink - /*streamSink1(pTask, pMsgCb);*/ if (pTask->dispatchType != TASK_DISPATCH__NONE) { - streamDispatchAll(pTask, pMsgCb); + streamDispatch(pTask, pMsgCb); } return 0; @@ -100,9 +99,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp return 0; } // continue dispatch - /*streamSink1(pTask, pMsgCb);*/ if (pTask->dispatchType != TASK_DISPATCH__NONE) { - streamDispatchAll(pTask, pMsgCb); + streamDispatch(pTask, pMsgCb); } return 0; } @@ -110,9 +108,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { streamExec(pTask, pMsgCb); if (pTask->dispatchType != TASK_DISPATCH__NONE) { - streamDispatchAll(pTask, pMsgCb); + streamDispatch(pTask, pMsgCb); } - /*streamSink1(pTask, pMsgCb);*/ return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 4a242d9c010816e36c405908d8f0b8dc47f5bed8..473ad3275228c9cc9098366942ce6a4011f82a9d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -86,22 +86,3 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); return pSubmitClone; } - -#if 0 -int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->streamId); - tlen += taosEncodeFixedI32(buf, pReq->taskId); - tlen += tEncodeDataBlocks(buf, pReq->data); - return tlen; -} - -void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->streamId); - buf = taosDecodeFixedI32(buf, &pReq->taskId); - buf = tDecodeDataBlocks(buf, &pReq->data); - return (void*)buf; -} - -void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); } -#endif diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamDispatch.c similarity index 72% rename from source/libs/stream/src/streamMsg.c rename to source/libs/stream/src/streamDispatch.c index 0cdbea9c67ab4f8fdf079a9cd34dd82909f2d771..d1e3fa079916b1413ef189b2eb71680cfbbafb0a 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamDispatch.c @@ -174,85 +174,26 @@ FAIL: return code; } -#if 0 -static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { - SStreamTaskExecReq req = { - .streamId = pTask->streamId, - .data = data, - }; - - int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); - void* buf = rpcMallocCont(tlen); - - if (buf == NULL) { - return -1; +int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { +#if 1 + int8_t old = + atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); + if (old != TASK_OUTPUT_STATUS__NORMAL) { + return 0; } +#endif - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - ((SMsgHead*)buf)->vgId = 0; - req.taskId = pTask->inplaceDispatcher.taskId; - - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); - *ppEpSet = &pTask->fixedEpDispatcher.epSet; - req.taskId = pTask->fixedEpDispatcher.taskId; - - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - // TODO use general name rule of schemaless - char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0}; - // all groupId must be the same in an array - SSDataBlock* pBlock = taosArrayGet(data, 0); - sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); - - // TODO: get hash function by hashMethod - - // get groupId, compute hash value - uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); + if (pBlock == NULL) return 0; + ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); - // get node - // TODO: optimize search process - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(vgInfo); - int32_t nodeId = 0; - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - nodeId = pVgInfo->vgId; - req.taskId = pVgInfo->taskId; - *ppEpSet = &pVgInfo->epSet; - break; - } - } - ASSERT(nodeId != 0); - ((SMsgHead*)buf)->vgId = htonl(nodeId); + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; } - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSStreamTaskExecReq(&abuf, &req); - - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - pMsg->msgType = pTask->dispatchMsgType; - pMsg->info.noResp = 1; - + tmsgSendReq(pEpSet, &dispatchMsg); return 0; } - -static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashObj* data) { - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(data, pIter); - if (pIter == NULL) return 0; - SArray* pData = *(SArray**)pIter; - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet; - if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - tmsgSendReq(pEpSet, &dispatchMsg); - } - return 0; -} -#endif diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index f82ef1b42fff735e92106eee97731a2c14b1d380..d5a4da60f56b08a9be47406280b6a812701c3870 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -41,3 +41,12 @@ void streamQueueClose(SStreamQueue* queue) { return; } } + +void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { + int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); + ASSERT(ref >= 0); + if (ref == 0) { + taosMemoryFree(pDataSubmit->data); + taosMemoryFree(pDataSubmit->dataRef); + } +} diff --git a/source/libs/stream/src/streamSink.c b/source/libs/stream/src/streamSink.c deleted file mode 100644 index a5f95c4d45c3a8176b35ad56d8349babc59e1d48..0000000000000000000000000000000000000000 --- a/source/libs/stream/src/streamSink.c +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "streamInc.h" - -int32_t streamDispatchAll(SStreamTask* pTask, SMsgCb* pMsgCb) { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); - while (1) { - SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); - if (pBlock == NULL) break; - ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); - - streamDispatch(pTask, pMsgCb, pBlock); - - /*streamQueueProcessSuccess(pTask->outputQueue);*/ - } - return 0; -} - -int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { - SStreamQueue* queue; - if (pTask->execType == TASK_EXEC__NONE) { - queue = pTask->inputQueue; - } else { - queue = pTask->outputQueue; - } - - /*if (streamDequeueBegin(queue) == true) {*/ - /*return -1;*/ - /*}*/ - - if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA || - pTask->dispatchType != TASK_DISPATCH__NONE) { - while (1) { - SStreamDataBlock* pBlock = streamQueueNextItem(queue); - if (pBlock == NULL) break; - ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); - - // local sink - if (pTask->sinkType == TASK_SINK__TABLE) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); - } else if (pTask->sinkType == TASK_SINK__SMA) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); - pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks); - } - - // TODO: sink and dispatch should be only one - if (pTask->dispatchType != TASK_DISPATCH__NONE) { - ASSERT(queue == pTask->outputQueue); - ASSERT(pTask->sinkType == TASK_SINK__NONE); - - streamDispatch(pTask, pMsgCb, pBlock); - } - - streamQueueProcessSuccess(queue); - } - } - - return 0; -} - -int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) { -#if 1 - int8_t old = - atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); - if (old != TASK_OUTPUT_STATUS__NORMAL) { - return 0; - } -#endif - ASSERT(pTask->dispatchType != TASK_DISPATCH__INPLACE); - - /*if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {*/ - /*SRpcMsg dispatchMsg = {0};*/ - /*if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {*/ - /*ASSERT(0);*/ - /*return -1;*/ - /*}*/ - - /*int32_t qType;*/ - /*if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {*/ - /*qType = FETCH_QUEUE;*/ - /*} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {*/ - /*qType = WRITE_QUEUE;*/ - /*} else {*/ - /*ASSERT(0);*/ - /*}*/ - /*tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);*/ - /*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/ - - if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - - tmsgSendReq(pEpSet, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - - tmsgSendReq(pEpSet, &dispatchMsg); - } - return 0; -} - -#if 0 -int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { - bool firstRun = 1; - while (1) { - SStreamDataBlock* pBlock = NULL; - if (!firstRun) { - taosReadAllQitems(pTask->outputQ, pTask->outputQAll); - } - taosGetQitem(pTask->outputQAll, (void**)&pBlock); - if (pBlock == NULL) { - if (firstRun) { - firstRun = 0; - continue; - } else { - break; - } - } - - SArray* pRes = pBlock->blocks; - - // sink - if (pTask->sinkType == TASK_SINK__TABLE) { - // blockDebugShowData(pRes); - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); - } else if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); - // - } else if (pTask->sinkType == TASK_SINK__FETCH) { - // - } else { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - } - - // dispatch - // TODO dispatch guard - int8_t outputStatus = atomic_load_8(&pTask->outputStatus); - if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) { - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - SRpcMsg dispatchMsg = {0}; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { - ASSERT(0); - return -1; - } - - int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { - qType = FETCH_QUEUE; - /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/ - /*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/ - /*qType = MERGE_QUEUE;*/ - /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/ - /*qType = WRITE_QUEUE;*/ - } else { - ASSERT(0); - } - tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - - tmsgSendReq(pEpSet, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (pShuffleRes == NULL) { - return -1; - } - - int32_t sz = taosArrayGetSize(pRes); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pRes, i); - SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t)); - if (pArray == NULL) { - pArray = taosArrayInit(0, sizeof(SSDataBlock)); - if (pArray == NULL) { - return -1; - } - taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*)); - } - taosArrayPush(pArray, pDataBlock); - } - - if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) { - return -1; - } - - } else { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); - } - } - } - return 0; -} -#endif