diff --git a/include/common/tmsg.h b/include/common/tmsg.h index addb84046c1d49fe9e450519e63035266c1e9d5f..d9087f59c6f8062e0cfb5f832eda48c825cc3960 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2589,18 +2589,6 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); } -typedef struct { - int64_t streamId; - int32_t taskId; - int32_t sourceVg; - int64_t sourceVer; - SArray* data; // SArray -} SStreamDispatchReq; - -typedef struct { - int8_t inputStatus; -} SStreamDispatchRsp; - #define TD_AUTO_CREATE_TABLE 0x1 typedef struct { int64_t suid; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 93b2e7536054115c48ed2aa6b286650db8cbb8bb..455898585aaec2935d72aab0cdf6dfab6a0aac48 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -200,6 +200,10 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bca947b84c81817d7d213f2b161053be58ab7281..1604749af8cb8f3073bd4b1ef46e30d45a37ddff 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) if (ref == 0) { taosMemoryFree(pDataSubmit->data); taosMemoryFree(pDataSubmit->dataRef); - taosFreeQitem(pDataSubmit); + // taosFreeQitem(pDataSubmit); } } @@ -286,6 +286,36 @@ typedef struct { int32_t taskId; } SStreamTaskRunReq; +typedef struct { + int64_t streamId; + int32_t taskId; + int32_t sourceTaskId; + int32_t sourceVg; +#if 0 + int64_t sourceVer; +#endif + SArray* data; // SArray +} SStreamDispatchReq; + +typedef struct { + int64_t streamId; + int32_t taskId; + int8_t inputStatus; +} SStreamDispatchRsp; + +typedef struct { + int64_t streamId; + int32_t taskId; + int32_t sourceTaskId; + int32_t sourceVg; +} SStreamTaskRecoverReq; + +typedef struct { + int64_t streamId; + int32_t taskId; + int8_t inputStatus; +} SStreamTaskRecoverRsp; + int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); int32_t streamDequeueOutput(SStreamTask* pTask, void** output); @@ -296,6 +326,12 @@ int32_t streamTaskRun(SStreamTask* pTask); int32_t streamTaskHandleInput(SStreamTask* pTask, void* data); +int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb); +int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg); +int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp); +int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); +int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 3da3d90ae17acf8f20234c0ef72fc5cd882b1af0..f28209f9828062f8ed27f194914b4ac11848735a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -314,6 +314,10 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1c896493137d6246a248ebd7702b95e34dca8dec..23825e6f4a1085f3104414110814853a319c98e8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -121,10 +121,18 @@ int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +#if 0 +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); +#endif +int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data); +int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaOpen(SVnode* pVnode); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a3b47c0d182bd2ece296eb0417852b56cb89949..1f35ec2650f2cc5c6640e6c92bf308bd162663f9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -105,12 +105,11 @@ static void tdSRowDemo() { } int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { - void* pIter = NULL; - STqExec* pExec = NULL; + void* pIter = NULL; while (1) { pIter = taosHashIterate(pTq->execs, pIter); if (pIter == NULL) break; - pExec = (STqExec*)pIter; + STqExec* pExec = (STqExec*)pIter; if (pExec->subType == TOPIC_SUB_TYPE__DB) { if (!isAdd) { int32_t sz = taosArrayGetSize(tbUidList); @@ -274,6 +273,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) } memcpy(data, msg, msgLen); + tqProcessStreamTriggerNew(pTq, data); + +#if 0 SRpcMsg req = { .msgType = TDMT_VND_STREAM_TRIGGER, .pCont = data, @@ -281,6 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) }; tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); +#endif return 0; } @@ -975,12 +978,24 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { + pTask->status = TASK_STATUS__IDLE; + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + + pTask->inputQ = taosOpenQueue(); + pTask->outputQ = taosOpenQueue(); + pTask->inputQAll = taosAllocateQall(); + pTask->outputQAll = taosAllocateQall(); + + if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL) + goto FAIL; + if (pTask->execType != TASK_EXEC__NONE) { // expand runners pTask->exec.numOfRunners = parallel; pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); if (pTask->exec.runners == NULL) { - return -1; + goto FAIL; } for (int32_t i = 0; i < parallel; i++) { STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); @@ -1002,6 +1017,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { } return 0; +FAIL: + if (pTask->inputQ) taosCloseQueue(pTask->inputQ); + if (pTask->outputQ) taosCloseQueue(pTask->outputQ); + if (pTask->inputQAll) taosFreeQall(pTask->inputQAll); + if (pTask->outputQAll) taosFreeQall(pTask->outputQAll); + if (pTask) taosMemoryFree(pTask); + return -1; } int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { @@ -1053,6 +1075,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo return 0; } +#if 0 int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) { SStreamDataSubmit* pSubmit = NULL; @@ -1103,6 +1126,7 @@ FAIL: } return -1; } +#endif int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) { SStreamTaskExecReq req; @@ -1120,25 +1144,28 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) return 0; } -int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { +int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* pReq) { void* pIter = NULL; bool failed = false; SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); if (pSubmit == NULL) { failed = true; + goto SET_TASK_FAIL; } pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t)); if (pSubmit->dataRef == NULL) { failed = true; + goto SET_TASK_FAIL; } - pSubmit->type = STREAM_DATA_TYPE_SUBMIT_BLOCK; - pSubmit->sourceVer = ver; - pSubmit->sourceVg = pTq->pVnode->config.vgId; + pSubmit->type = STREAM_INPUT__DATA_SUBMIT; + /*pSubmit->sourceVer = ver;*/ + /*pSubmit->sourceVg = pTq->pVnode->config.vgId;*/ pSubmit->data = pReq; *pSubmit->dataRef = 1; +SET_TASK_FAIL: while (1) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; @@ -1157,7 +1184,18 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int8_t execStatus = atomic_load_8(&pTask->status); if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { - // TODO dispatch task launch msg to fetch queue + SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) continue; + // TODO: do we need htonl? + pRunReq->head.vgId = pTq->pVnode->config.vgId; + pRunReq->streamId = pTask->streamId; + pRunReq->taskId = pTask->taskId; + SRpcMsg msg = { + .msgType = TDMT_VND_TASK_RUN, + .pCont = pRunReq, + .contLen = sizeof(SStreamTaskRunReq), + }; + tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg); } } else { @@ -1169,11 +1207,53 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { streamDataSubmitRefDec(pSubmit); return 0; } else { + if (pSubmit) { + if (pSubmit->dataRef) { + taosMemoryFree(pSubmit->dataRef); + } + taosFreeQitem(pSubmit); + } return -1; } } -int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // + SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb); + return 0; +} + +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { + SStreamDispatchReq* pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + return 0; +} + +int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { + SStreamTaskRecoverReq* pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + return 0; +} + +int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { + SStreamDispatchRsp* pRsp = pMsg->pCont; + int32_t taskId = pRsp->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); + return 0; +} + +int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { + SStreamTaskRecoverRsp* pRsp = pMsg->pCont; + int32_t taskId = pRsp->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessRecoverRsp(pTask, pRsp); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 466c300d9784f5c208979096fd51e29f725ee984..297b518ac76d215b40c5527a2822dd9bf48acba6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -106,11 +106,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pMsg->contLen - sizeof(SMsgHead)) < 0) { } } break; +#if 0 case TDMT_VND_TASK_WRITE_EXEC: { if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), 0) < 0) { } } break; +#endif case TDMT_VND_ALTER_VNODE: break; default: @@ -162,7 +164,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); - char * msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { case TDMT_VND_FETCH: @@ -181,15 +183,32 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); + + case TDMT_VND_TASK_RUN: { + int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg); + pMsg->pCont = NULL; + return code; + } + case TDMT_VND_TASK_DISPATCH: + return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_RECOVER: + return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_DISPATCH_RSP: + return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); + case TDMT_VND_TASK_RECOVER_RSP: + return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); + +#if 0 case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); - case TDMT_VND_STREAM_TRIGGER: { + case TDMT_VND_STREAM_TRIGGER:{ // refactor, avoid double free int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); pMsg->pCont = NULL; return code; } +#endif case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: @@ -332,12 +351,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, SDecoder decoder = {0}; int rcode = 0; SVCreateTbBatchReq req = {0}; - SVCreateTbReq * pCreateReq; + SVCreateTbReq *pCreateReq; SVCreateTbBatchRsp rsp = {0}; SVCreateTbRsp cRsp = {0}; char tbName[TSDB_TABLE_FNAME_LEN]; - STbUidStore * pStore = NULL; - SArray * tbUids = NULL; + STbUidStore *pStore = NULL; + SArray *tbUids = NULL; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -521,7 +540,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in SDecoder decoder = {0}; SEncoder encoder = {0}; int ret; - SArray * tbUids = NULL; + SArray *tbUids = NULL; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->pCont = NULL; @@ -576,9 +595,9 @@ _exit: static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { SSubmitBlkIter blkIter = {0}; - STSchema * pSchema = NULL; + STSchema *pSchema = NULL; tb_uid_t suid = 0; - STSRow * row = NULL; + STSRow *row = NULL; tInitSubmitBlkIter(msgIter, pBlock, &blkIter); if (blkIter.row == NULL) return 0; @@ -609,8 +628,8 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { ASSERT(pMsg != NULL); SSubmitMsgIter msgIter = {0}; - SMeta * pMeta = pVnode->pMeta; - SSubmitBlk * pBlock = NULL; + SMeta *pMeta = pVnode->pMeta; + SSubmitBlk *pBlock = NULL; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { @@ -624,10 +643,10 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char } static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - SSubmitReq * pSubmitReq = (SSubmitReq *)pReq; + SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; - SSubmitBlk * pBlock; + SSubmitBlk *pBlock; SSubmitRsp rsp = {0}; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 38b6f2b0e2a13d4f231b1307935a91557f232f48..66a661481e8f751b7a3a030bc7b85b38c75040d5 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -68,7 +68,7 @@ static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMs // get groupId, compute hash value uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); - // + // get node // TODO: optimize search process SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -152,13 +152,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) // exec while (1) { - SSDataBlock* output; + SSDataBlock* output = NULL; uint64_t ts = 0; if (qExecTask(exec, &output, &ts) < 0) { ASSERT(false); } if (output == NULL) break; - taosArrayPush(pRes, &output); + taosArrayPush(pRes, output); } // destroy @@ -189,7 +189,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -209,7 +209,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -231,7 +231,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -253,7 +253,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -392,12 +392,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* // 1.2 enqueue pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pBlock->sourceVg = pReq->sourceVg; - pBlock->sourceVer = pReq->sourceVer; + /*pBlock->sourceVer = pReq->sourceVer;*/ taosWriteQitem(pTask->inputQ, pBlock); // 1.3 rsp by input status SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp)); pCont->inputStatus = status; + pCont->streamId = pReq->streamId; + pCont->taskId = pReq->sourceTaskId; pRsp->pCont = pCont; pRsp->contLen = sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -439,12 +441,12 @@ int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { return 0; } -int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) { +int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { // return 0; } -int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, char* msg) { +int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { // return 0; }