diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 051ee34644ce214d5be67109944233a868b66a58..36a489eb59a2e7aeedf5523d778b56094e43e78b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -192,7 +192,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4efddde9351729ec5a0ba8b9ffed3cb78f2c12fe..8be9bbbebd6871b8318b910d48d260e4dcb35e0b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -82,8 +82,12 @@ typedef struct { SHashObj* pHash; // groupId to tbuid } STaskSinkTb; +typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data); + typedef struct { - int8_t reserved; + int64_t smaId; + // following are not applicable to encoder and decoder + FSmaHandle* smaHandle; } STaskSinkSma; typedef struct { @@ -156,7 +160,8 @@ typedef struct { STaskDispatcherShuffle shuffleDispatcher; }; - // state storage + // application storage + void* ahandle; } SStreamTask; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 8eaca6853d6bfad12136d9fd033f084766308f22..5d00cca76e82f6e23651fcdc27feb25fac10b378 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL: } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { - printf("call update ep %d\n", epoch); + /*printf("call update ep %d\n", epoch);*/ bool set = false; int32_t sz = taosArrayGetSize(pRsp->topics); SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 97d829571fe4438e46b59c4a677d296b789d9b81..1682c6043dd08115a0e670a0eca1568765a6af16 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 42951beca2e414611543d92e08b86d19f6247636..416061bf340ff5d387d1e2f02a7869f7d169906c 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b5d22cb7a5ba931ccbf5c5986ec41d7cbf5e6e5e..e7cdd34a7e9e96b71b6b0a50dc771ee27b6afd41 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 24f2a5df227f80fc9bb62c144f351e711f04f5b0..1b3564924a7a8349ecb2d7cbc83278745d557fcd 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, sz) < 0) return -1; for (int32_t i = 0; i < sz; i++) { - SArray *pArray = taosArrayGet(pObj->tasks, i); + SArray *pArray = taosArrayGetP(pObj->tasks, i); int32_t innerSz = taosArrayGetSize(pArray); if (tEncodeI32(pEncoder, innerSz) < 0) return -1; for (int32_t j = 0; j < innerSz; j++) { - SStreamTask *pTask = taosArrayGet(pArray, j); + SStreamTask *pTask = taosArrayGetP(pArray, j); if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1; } } @@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { int32_t sz; if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (sz != 0) { - pObj->tasks = taosArrayInit(sz, sizeof(SArray)); + pObj->tasks = taosArrayInit(sz, sizeof(void *)); for (int32_t i = 0; i < sz; i++) { int32_t innerSz; if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; - SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask)); + SArray *pArray = taosArrayInit(innerSz, sizeof(void *)); for (int32_t j = 0; j < innerSz; j++) { - SStreamTask task; - if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1; - taosArrayPush(pArray, &task); + SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) return -1; + if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1; + taosArrayPush(pArray, &pTask); } - taosArrayPush(pObj->tasks, pArray); + taosArrayPush(pObj->tasks, &pArray); } } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 69ee1a569693329765d8bcf4892555fc1d3acf44..697811cd04d6e60d5d506113f0839a4702e5e2b8 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { @@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // only for inplace pTask->sinkType = TASK_SINK__SHOW; pTask->showSink.reserved = 0; + if (smaId != -1) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = smaId; + } } else { pTask->sinkType = TASK_SINK__NONE; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 146975aa38851253bc1ca8f656de672a11dc666f..5c62cfa0f2c55bb9e592229bd546530a71acc469 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {} static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { terrno = TSDB_CODE_OUT_OF_MEMORY; - int32_t size = sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; + int32_t size = + sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size); if (pRaw == NULL) goto _OVER; @@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -491,7 +492,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); goto _OVER; } - + pStream = mndAcquireStream(pMnode, createReq.name); if (pStream != NULL) { mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c02fec0a5f10c8dd3f86e46af6946fced30e888e..bbb2f6428223c5169b7120bdaf045555884e6c52 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } - if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { + if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); return -1; } @@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 8d256995c68fd196f8c5df96823816bd48b78cb3..7b0606512c91183383a1ff707a159721a32f0359 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -198,10 +198,13 @@ int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); +// sma +void smaHandleRes(SVnode* pVnode, int64_t smaId, const SArray* data); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4661668cbed50cd6914fa8c22846b290b8cbf803..55202335e05ff020ad8a3298cef0cd50fa162640 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { if (tqExpandTask(pTq, pTask, 4) < 0) { ASSERT(0); } + pTask->ahandle = pTq->pVnode; taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); @@ -497,11 +498,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { return 0; } -int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { - char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead)); - +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) { SStreamTaskExecReq req; - tDecodeSStreamTaskExecReq(msgstr, &req); + tDecodeSStreamTaskExecReq(msg, &req); int32_t taskId = req.taskId; SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 94e183f5256a92f926870eba892a71e85b91872d..74d7558e0d3e33fa09122f48f4f8e60effe95c41 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in fetch queue is processing"); + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { case TDMT_VND_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); @@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); - case TDMT_VND_TASK_EXEC: case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: - return tqProcessTaskExec(pVnode->pTq, pMsg); + return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen); case TDMT_VND_STREAM_TRIGGER: return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); case TDMT_VND_QUERY_HEARTBEAT: diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 02d4524bce2b8fce49e26498375692571219429f..98256e6b248f4a62c6a810f47e6004a0a926f967 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -178,6 +178,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pMsg->contLen - sizeof(SMsgHead)) < 0) { } } break; + case TDMT_VND_TASK_WRITE_EXEC: { + if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + } + } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA #if 0 SSmaCfg vCreateSmaReq = {0}; diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 31a06a9d9a6e464a9b6f223712b00c7659133b0c..028e310a25d7f46729a9b2e7006ed11e67cd9f1f 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in if (pTask->sinkType == TASK_SINK__TABLE) { // } else if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); // } else if (pTask->sinkType == TASK_SINK__FETCH) { // @@ -205,9 +206,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } - if (pTask->sinkType != TASK_SINK__NONE) { - // TODO: wrap + if (pTask->sinkType == TASK_SINK__TABLE) { if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SMA) { + if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__FETCH) { + if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SHOW) { + if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1; + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); } if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { @@ -244,8 +252,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; } - if (pTask->sinkType != TASK_SINK__NONE) { + if (pTask->sinkType == TASK_SINK__TABLE) { if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SMA) { + if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__FETCH) { + if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; + } else if (pTask->sinkType == TASK_SINK__SHOW) { + if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1; + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); } if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {