From 71984b4268098d6d560fc1f4291653c5315835d5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 20 May 2022 05:43:36 +0800 Subject: [PATCH] refactor(stream): exec --- include/libs/stream/tstream.h | 7 ++ source/dnode/vnode/src/tq/tq.c | 64 ++++++++++++-- source/dnode/vnode/src/tq/tqRead.c | 10 --- source/libs/stream/src/tstream.c | 132 +++++++++++++++++++---------- 4 files changed, 149 insertions(+), 64 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4460327b88..bca947b84c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -107,6 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) if (ref == 0) { taosMemoryFree(pDataSubmit->data); taosMemoryFree(pDataSubmit->dataRef); + taosFreeQitem(pDataSubmit); } } @@ -279,6 +280,12 @@ typedef struct { SArray* res; // SArray } SStreamSinkReq; +typedef struct { + SMsgHead head; + int64_t streamId; + int32_t taskId; +} SStreamTaskRunReq; + int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); int32_t streamDequeueOutput(SStreamTask* pTask, void** output); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 212834f7d7..9361c0e6d2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -112,19 +112,18 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { if (pIter == NULL) break; pExec = (STqExec*)pIter; if (pExec->subType == TOPIC_SUB_TYPE__DB) { - if (isAdd) { - continue; - } else { + if (!isAdd) { int32_t sz = taosArrayGetSize(tbUidList); for (int32_t i = 0; i < sz; i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0); } } - } - for (int32_t i = 0; i < 5; i++) { - int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd); - ASSERT(code == 0); + } else { + for (int32_t i = 0; i < 5; i++) { + int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd); + ASSERT(code == 0); + } } } return 0; @@ -1059,6 +1058,57 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo return 0; } +int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) { + SStreamDataSubmit* pSubmit = NULL; + + // build data + pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + if (pSubmit == NULL) return -1; + pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t)); + if (pSubmit->dataRef == NULL) goto FAIL; + *pSubmit->dataRef = 1; + pSubmit->data = data; + pSubmit->type = STREAM_INPUT__DATA_BLOCK; + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pTq->pStreamTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = (SStreamTask*)pIter; + if (pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK) { + streamEnqueueDataSubmit(pTask, pSubmit); + // TODO cal back pressure + } + // check run + int8_t execStatus = atomic_load_8(&pTask->status); + if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { + SStreamTaskRunReq* pReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq)); + if (pReq == NULL) continue; + // TODO: do we need htonl? + pReq->head.vgId = pTq->pVnode->config.vgId; + pReq->streamId = pTask->streamId; + pReq->taskId = pTask->taskId; + SRpcMsg msg = { + .msgType = 0, + .pCont = pReq, + .contLen = sizeof(SStreamTaskRunReq), + }; + tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg); + } + } + streamDataSubmitRefDec(pSubmit); + + return 0; +FAIL: + if (pSubmit) { + if (pSubmit->dataRef) { + taosMemoryFree(pSubmit->dataRef); + } + taosFreeQitem(pSubmit); + } + return -1; +} + int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) { SStreamTaskExecReq req; tDecodeSStreamTaskExecReq(msg, &req); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 28344de897..db5e90743d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -34,21 +34,11 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; - // pMsg->length = htonl(pMsg->length); - // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - // iterate and convert if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; while (true) { if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (pReadHandle->pBlock == NULL) break; - - // pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); - // pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid); - // pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion); - // pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen); - // pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen); - // pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); } if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 70860be7e1..38b6f2b0e2 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -247,6 +247,19 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { void* data = NULL; taosGetQitem(pTask->inputQAll, &data); if (data == NULL) break; + + streamTaskExecImpl(pTask, data, pRes); + + taosFreeQitem(data); + + if (taosArrayGetSize(pRes) != 0) { + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + resQ->type = STREAM_INPUT__DATA_BLOCK; + resQ->blocks = pRes; + taosWriteQitem(pTask->outputQ, resQ); + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) goto FAIL; + } } atomic_store_8(&pTask->status, TASK_STATUS__IDLE); @@ -298,62 +311,66 @@ int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) { } // dispatch - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - SRpcMsg dispatchMsg = {0}; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { - ASSERT(0); - return -1; - } + // TODO dispatch guard + int8_t outputStatus = atomic_load_8(&pTask->outputStatus); + if (outputStatus == TASK_OUTPUT_STATUS__NORMAL) { + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + SRpcMsg dispatchMsg = {0}; + if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { + ASSERT(0); + return -1; + } - int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { - qType = FETCH_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || - pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { - qType = MERGE_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { - qType = WRITE_QUEUE; - } else { - ASSERT(0); - } - tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } + int32_t qType; + if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { + qType = FETCH_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || + pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { + qType = MERGE_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { + qType = WRITE_QUEUE; + } else { + ASSERT(0); + } + tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); - tmsgSendReq(pEpSet, &dispatchMsg); + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (pShuffleRes == NULL) { - return -1; - } + tmsgSendReq(pEpSet, &dispatchMsg); - int32_t sz = taosArrayGetSize(pRes); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pRes, i); - SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t)); - if (pArray == NULL) { - pArray = taosArrayInit(0, sizeof(SSDataBlock)); + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (pShuffleRes == NULL) { + return -1; + } + + int32_t sz = taosArrayGetSize(pRes); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pRes, i); + SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t)); if (pArray == NULL) { - return -1; + pArray = taosArrayInit(0, sizeof(SSDataBlock)); + if (pArray == NULL) { + return -1; + } + taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*)); } - taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*)); + taosArrayPush(pArray, pDataBlock); } - taosArrayPush(pArray, pDataBlock); - } - if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) { - return -1; - } + if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) { + return -1; + } - } else { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + } else { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + } } } return 0; @@ -406,11 +423,32 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream return 0; } +int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { + atomic_store_8(&pTask->inputStatus, pRsp->inputStatus); + if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + // TODO: init recover timer + } + // continue dispatch + streamTaskSink(pTask, pMsgCb); + return 0; +} + +int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { + streamTaskExec2(pTask, pMsgCb); + streamTaskSink(pTask, pMsgCb); + return 0; +} + int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) { // return 0; } +int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, char* msg) { + // + return 0; +} + int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) { SArray* pRes = NULL; // source -- GitLab