From 03b216230931c4e8023e64a288b2b10fea11d002 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 25 Mar 2022 21:03:12 +0800 Subject: [PATCH] refactor stream --- include/common/tmsg.h | 167 +------------------------ include/libs/stream/tstream.h | 164 ++++++++++++++++++++++++ source/common/src/tmsg.c | 103 --------------- source/dnode/mnode/impl/CMakeLists.txt | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/snode/CMakeLists.txt | 1 + source/dnode/snode/inc/sndInt.h | 1 + source/libs/stream/src/tstream.c | 103 +++++++++++++++ 8 files changed, 272 insertions(+), 270 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 91765c90da..0ed2a6a006 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -207,7 +207,7 @@ typedef struct { // Submit message for one table typedef struct SSubmitBlk { int64_t uid; // table unique id - int64_t suid; // stable id + int64_t suid; // stable id int32_t padding; // TODO just for padding here int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head @@ -2358,171 +2358,6 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p } return buf; } - -enum { - STREAM_TASK_STATUS__RUNNING = 1, - STREAM_TASK_STATUS__STOP, -}; - -// pipe -> fetch/pipe queue -// merge -> merge queue -// write -> write queue -enum { - TASK_SINK_MSG__SND_PIPE = 1, - TASK_SINK_MSG__SND_MERGE, - TASK_SINK_MSG__VND_PIPE, - TASK_SINK_MSG__VND_MERGE, - TASK_SINK_MSG__VND_WRITE, -}; - -typedef struct { - int32_t nodeId; // 0 for snode - SEpSet epSet; -} SStreamTaskEp; - -typedef struct { - void* inputHandle; - void* executor; -} SStreamRunner; - -typedef struct { - int8_t parallelizable; - char* qmsg; - // followings are not applicable to encoder and decoder - int8_t numOfRunners; - SStreamRunner* runners; -} STaskExec; - -typedef struct { - int8_t reserved; -} STaskDispatcherInplace; - -typedef struct { - int32_t nodeId; - SEpSet epSet; -} STaskDispatcherFixedEp; - -typedef struct { - int8_t hashMethod; - SArray* info; -} STaskDispatcherShuffle; - -typedef struct { - int8_t reserved; - // not applicable to encoder and decoder - SHashObj* pHash; // groupId to tbuid -} STaskSinkTb; - -typedef struct { - int8_t reserved; -} STaskSinkSma; - -typedef struct { - int8_t reserved; -} STaskSinkFetch; - -typedef struct { - int8_t reserved; -} STaskSinkShow; - -enum { - TASK_SOURCE__SCAN = 1, - TASK_SOURCE__SINGLE, - TASK_SOURCE__MULTI, -}; - -enum { - TASK_EXEC__NONE = 1, - TASK_EXEC__EXEC, -}; - -enum { - TASK_DISPATCH__NONE = 1, - TASK_DISPATCH__INPLACE, - TASK_DISPATCH__FIXED, - TASK_DISPATCH__SHUFFLE, -}; - -enum { - TASK_SINK__NONE = 1, - TASK_SINK__TABLE, - TASK_SINK__SMA, - TASK_SINK__FETCH, - TASK_SINK__SHOW, -}; - -typedef struct { - int64_t streamId; - int32_t taskId; - int8_t status; - - int8_t sourceType; - int8_t execType; - int8_t sinkType; - int8_t dispatchType; - int16_t dispatchMsgType; - int32_t downstreamTaskId; - - // source preprocess - - // exec - STaskExec exec; - - // local sink - union { - STaskSinkTb tbSink; - STaskSinkSma smaSink; - STaskSinkFetch fetchSink; - STaskSinkShow showSink; - }; - - // dispatch - union { - STaskDispatcherInplace inplaceDispatcher; - STaskDispatcherFixedEp fixedEpDispatcher; - STaskDispatcherShuffle shuffleDispatcher; - }; - - // state storage - -} SStreamTask; - -SStreamTask* tNewSStreamTask(int64_t streamId); -int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask); -int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask); -void tFreeSStreamTask(SStreamTask* pTask); - -typedef struct { - // SMsgHead head; - SStreamTask* task; -} SStreamTaskDeployReq; - -typedef struct { - int32_t reserved; -} SStreamTaskDeployRsp; - -typedef struct { - // SMsgHead head; - int64_t streamId; - int32_t taskId; - SArray* data; // SArray -} SStreamTaskExecReq; - -int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq); -void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq); -void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq); - -typedef struct { - int32_t reserved; -} SStreamTaskExecRsp; - -typedef struct { - // SMsgHead head; - int64_t streamId; - int64_t version; - SArray* res; // SArray -} SStreamSinkReq; - #pragma pack(pop) #ifdef __cplusplus diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 88b9e22d04..7200e78a41 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -25,6 +25,170 @@ extern "C" { #ifndef _TSTREAM_H_ #define _TSTREAM_H_ +enum { + STREAM_TASK_STATUS__RUNNING = 1, + STREAM_TASK_STATUS__STOP, +}; + +// pipe -> fetch/pipe queue +// merge -> merge queue +// write -> write queue +enum { + TASK_SINK_MSG__SND_PIPE = 1, + TASK_SINK_MSG__SND_MERGE, + TASK_SINK_MSG__VND_PIPE, + TASK_SINK_MSG__VND_MERGE, + TASK_SINK_MSG__VND_WRITE, +}; + +typedef struct { + int32_t nodeId; // 0 for snode + SEpSet epSet; +} SStreamTaskEp; + +typedef struct { + void* inputHandle; + void* executor; +} SStreamRunner; + +typedef struct { + int8_t parallelizable; + char* qmsg; + // followings are not applicable to encoder and decoder + int8_t numOfRunners; + SStreamRunner* runners; +} STaskExec; + +typedef struct { + int8_t reserved; +} STaskDispatcherInplace; + +typedef struct { + int32_t nodeId; + SEpSet epSet; +} STaskDispatcherFixedEp; + +typedef struct { + int8_t hashMethod; + SArray* info; +} STaskDispatcherShuffle; + +typedef struct { + int8_t reserved; + // not applicable to encoder and decoder + SHashObj* pHash; // groupId to tbuid +} STaskSinkTb; + +typedef struct { + int8_t reserved; +} STaskSinkSma; + +typedef struct { + int8_t reserved; +} STaskSinkFetch; + +typedef struct { + int8_t reserved; +} STaskSinkShow; + +enum { + TASK_SOURCE__SCAN = 1, + TASK_SOURCE__SINGLE, + TASK_SOURCE__MULTI, +}; + +enum { + TASK_EXEC__NONE = 1, + TASK_EXEC__EXEC, +}; + +enum { + TASK_DISPATCH__NONE = 1, + TASK_DISPATCH__INPLACE, + TASK_DISPATCH__FIXED, + TASK_DISPATCH__SHUFFLE, +}; + +enum { + TASK_SINK__NONE = 1, + TASK_SINK__TABLE, + TASK_SINK__SMA, + TASK_SINK__FETCH, + TASK_SINK__SHOW, +}; + +typedef struct { + int64_t streamId; + int32_t taskId; + int8_t status; + + int8_t sourceType; + int8_t execType; + int8_t sinkType; + int8_t dispatchType; + int16_t dispatchMsgType; + int32_t downstreamTaskId; + + // source preprocess + + // exec + STaskExec exec; + + // local sink + union { + STaskSinkTb tbSink; + STaskSinkSma smaSink; + STaskSinkFetch fetchSink; + STaskSinkShow showSink; + }; + + // dispatch + union { + STaskDispatcherInplace inplaceDispatcher; + STaskDispatcherFixedEp fixedEpDispatcher; + STaskDispatcherShuffle shuffleDispatcher; + }; + + // state storage + +} SStreamTask; + +SStreamTask* tNewSStreamTask(int64_t streamId); +int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask); +int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask); +void tFreeSStreamTask(SStreamTask* pTask); + +typedef struct { + // SMsgHead head; + SStreamTask* task; +} SStreamTaskDeployReq; + +typedef struct { + int32_t reserved; +} SStreamTaskDeployRsp; + +typedef struct { + // SMsgHead head; + int64_t streamId; + int32_t taskId; + SArray* data; // SArray +} SStreamTaskExecReq; + +int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq); +void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq); +void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq); + +typedef struct { + int32_t reserved; +} SStreamTaskExecRsp; + +typedef struct { + // SMsgHead head; + int64_t streamId; + int64_t version; + SArray* res; // SArray +} SStreamSinkReq; + int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId); #ifdef __cplusplus diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 628720142a..09b790e73d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3094,106 +3094,3 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { tfree(pReq->sql); tfree(pReq->ast); } - -SStreamTask *tNewSStreamTask(int64_t streamId) { - SStreamTask *pTask = (SStreamTask *)calloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return NULL; - } - pTask->taskId = tGenIdPI32(); - pTask->streamId = streamId; - pTask->status = STREAM_TASK_STATUS__RUNNING; - /*pTask->qmsg = NULL;*/ - return pTask; -} - -int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { - /*if (tStartEncode(pEncoder) < 0) return -1;*/ - if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; - if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1; - - if (pTask->execType == TASK_EXEC__EXEC) { - if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->sinkType != TASK_SINK__NONE) { - // TODO: wrap - if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; - } - - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1; - } - - /*tEndEncode(pEncoder);*/ - return pEncoder->pos; -} - -int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { - /*if (tStartDecode(pDecoder) < 0) return -1;*/ - if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; - if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1; - - if (pTask->execType == TASK_EXEC__EXEC) { - if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->sinkType != TASK_SINK__NONE) { - if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; - } - - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1; - } - - /*tEndDecode(pDecoder);*/ - return 0; -} - -void tFreeSStreamTask(SStreamTask *pTask) { - // TODO - /*free(pTask->qmsg);*/ - /*free(pTask->executor);*/ - /*free(pTask);*/ -} - -#if 0 -int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) { - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - /*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/ - return pEncoder->size; -} -int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) { - return pEncoder->size; -} -void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { - taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock); -} -#endif diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index dd2caf1f7f..6cc9fae079 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -6,7 +6,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - mnode scheduler sdb wal transport cjson sync monitor parser + mnode scheduler sdb wal transport cjson sync monitor stream parser ) if(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index b22c41cfde..0585ae5556 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -26,6 +26,7 @@ #include "tlog.h" #include "tmsg.h" #include "trpc.h" +#include "tstream.h" #include "ttimer.h" #include "mnode.h" diff --git a/source/dnode/snode/CMakeLists.txt b/source/dnode/snode/CMakeLists.txt index 0c63e35e87..f177bda47a 100644 --- a/source/dnode/snode/CMakeLists.txt +++ b/source/dnode/snode/CMakeLists.txt @@ -13,4 +13,5 @@ target_link_libraries( PRIVATE common PRIVATE util PRIVATE qcom + PRIVATE stream ) diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 519b94cf46..2802537dcd 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -22,6 +22,7 @@ #include "tmsg.h" #include "tqueue.h" #include "trpc.h" +#include "tstream.h" #include "snode.h" diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 55bbb1920d..0cc4da2bd1 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -143,3 +143,106 @@ void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) { } void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); } + +SStreamTask* tNewSStreamTask(int64_t streamId) { + SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return NULL; + } + pTask->taskId = tGenIdPI32(); + pTask->streamId = streamId; + pTask->status = STREAM_TASK_STATUS__RUNNING; + /*pTask->qmsg = NULL;*/ + return pTask; +} + +int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { + /*if (tStartEncode(pEncoder) < 0) return -1;*/ + if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; + if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1; + + if (pTask->execType == TASK_EXEC__EXEC) { + if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->sinkType != TASK_SINK__NONE) { + // TODO: wrap + if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1; + } + + /*tEndEncode(pEncoder);*/ + return pEncoder->pos; +} + +int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { + /*if (tStartDecode(pDecoder) < 0) return -1;*/ + if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; + if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1; + + if (pTask->execType == TASK_EXEC__EXEC) { + if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->sinkType != TASK_SINK__NONE) { + if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1; + } + + /*tEndDecode(pDecoder);*/ + return 0; +} + +void tFreeSStreamTask(SStreamTask* pTask) { + // TODO + /*free(pTask->qmsg);*/ + /*free(pTask->executor);*/ + /*free(pTask);*/ +} + +#if 0 +int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) { + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + /*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/ + return pEncoder->size; +} +int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) { + return pEncoder->size; +} +void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { + taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock); +} +#endif -- GitLab