diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 70145b434d3cfb23d68a80f4964dd3b62d63d502..36966779de8af87a6904e3fdac7c921f36448043 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -241,6 +241,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e2616567f51fe82798c4e0fb428dde8bf0cb457a..1c3f905e23a1a401cd662c20a495fa61d2e49ade 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -46,7 +46,7 @@ enum { TASK_STATUS__DROPPING, TASK_STATUS__FAIL, TASK_STATUS__STOP, - TASK_STATUS__RECOVER_DOWNSTREAM, + TASK_STATUS__WAIT_DOWNSTREAM, TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER2, @@ -332,7 +332,10 @@ typedef struct SStreamTask { SStreamState* pState; // do not serialize - int32_t recoverWaitingChild; + int32_t recoverTryingDownstream; + int32_t recoverWaitingUpstream; + int64_t checkReqId; + SArray* checkReqIds; // shuffle } SStreamTask; @@ -418,7 +421,10 @@ typedef struct { typedef struct { int64_t streamId; - int32_t taskId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int32_t downstreamTaskId; int8_t inputStatus; } SStreamDispatchRsp; @@ -441,60 +447,40 @@ typedef struct { } SStreamRetrieveRsp; typedef struct { - SMsgHead msgHead; - int64_t streamId; - int32_t taskId; -} SStreamRecoverStep1Req, SStreamRecoverStep2Req; - -typedef struct { + int64_t reqId; int64_t streamId; - int32_t taskId; + int32_t upstreamNodeId; + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int32_t downstreamTaskId; int32_t childId; -} SStreamRecoverFinishReq; +} SStreamTaskCheckReq; -int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq); -int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq); - -#if 0 typedef struct { + int64_t reqId; int64_t streamId; - int32_t taskId; - int32_t upstreamTaskId; int32_t upstreamNodeId; -} SStreamTaskRecoverReq; - -typedef struct { - int64_t streamId; - int32_t rspTaskId; - int32_t reqTaskId; - int8_t inputStatus; -} SStreamTaskRecoverRsp; - -int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq); -int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq); - -int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp); -int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp); + int32_t upstreamTaskId; + int32_t downstreamNodeId; + int32_t downstreamTaskId; + int32_t childId; + int8_t status; +} SStreamTaskCheckRsp; typedef struct { - int64_t streamId; - int32_t taskId; -} SMStreamTaskRecoverReq; + SMsgHead msgHead; + int64_t streamId; + int32_t taskId; +} SStreamRecoverStep1Req, SStreamRecoverStep2Req; typedef struct { int64_t streamId; int32_t taskId; -} SMStreamTaskRecoverRsp; - -int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq); -int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq); - -int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp); -int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp); + int32_t childId; +} SStreamRecoverFinishReq; -int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); -int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); -#endif +int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq); +int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq); typedef struct { int64_t streamId; @@ -509,20 +495,18 @@ typedef struct { SArray* checkpointVer; // SArray } SStreamRecoverDownstreamRsp; +int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); +int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); + +int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); +int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); + int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq); int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq); int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp); int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp); -typedef struct { - int64_t streamId; - int32_t taskId; - int32_t waitingRspCnt; - int32_t totReq; - SArray* info; // SArray*> -} SStreamRecoverStatus; - int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); @@ -533,7 +517,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp); +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); @@ -544,6 +528,10 @@ int32_t streamSchedExec(SStreamTask* pTask); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); // recover and fill history +int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version); +int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version); +int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq); +int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version); // common int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 563820f60b279739d32e85fda774c1cb89fc41b1..2365469066b414e9504f4b63040726cfa20c4d8f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -635,9 +635,12 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002) +// stream +#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) + // TDLite -#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x4100) -#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x4101) +#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) +#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x5101) #ifdef __cplusplus } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ba639476d841be8ebe0e7114c2f8c662824079f1..8e7faf48f64054ca7a1de34de3b33bf6e1deb37a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1756,7 +1756,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { - tscDebug("consumer:%" PRId64 ", return rsp", tmq->consumerId); + tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId); diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 6a705275412211ae10a9f064fde247a20a011d33..8300b13476d8fa4126527e87d77f8652c59b5de4 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -58,7 +58,7 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dTrace("msg:%p, get from snode-stream queue", pMsg); int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg); if (code < 0) { - dGError("snd, msg:%p failed to process stream since %s", pMsg, terrstr(code)); + dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code)); smSendRsp(pMsg, terrno); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index e2812e0d312704633f4ec3519ddffaa98b867700..6faa3354eedc63ab7cc199306c5210b8408be147 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -403,7 +403,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; @@ -412,6 +411,9 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 71e2ece7333d80bf910103521a3164a4ec5c9bef..3dfcb8a708267e7cff07b7fb9614bea72407bf7c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -86,7 +86,8 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code)); + dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), + terrstr(code)); vmSendRsp(pMsg, code); } @@ -218,7 +219,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp return code; } -int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); } +int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); +} int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 7ad24be258ee23b547543f0296cc27ea281ee89f..4334bcfb48459ecbbfa988f481b62fc4e6fc766c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -155,7 +155,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: if (code != 0) { if (terrno != 0) code = terrno; - dGTrace("msg:%p, failed to process since %s", pMsg, terrstr()); + dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); if (IsReq(pRpc)) { SRpcMsg rsp = {.code = code, .info = pRpc->info}; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9f433f33225ae09c594b0d97806e9295ef4c5347..36ba0aaf877eeef621b5d94aba11650fe6f75cc5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -705,7 +705,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-stream"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream"); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 74f2b1288e9a296592fc7fec2162604031539056..a62f15f978bba618668ff2df5d0146db8c40f531 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -612,14 +612,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { mndReleaseSubscribe(pMnode, pSub); } - // TODO replace assert with error check - ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); + if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) { + mError("mq rebalance internal error"); + } // if add more consumer to balanced subscribe, // possibly no vg is changed /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ - // TODO replace assert with error check if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped"); } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d34159d312e9eb6340bcdd4103e90b283ea0fcce..08ea4808c54513c71e01a90204f2e6480c2c802f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -231,10 +231,10 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = pRsp->taskId; + int32_t taskId = pRsp->upstreamTaskId; SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); if (pTask) { - streamProcessDispatchRsp(pTask, pRsp); + streamProcessDispatchRsp(pTask, pRsp, pMsg->code); return 0; } else { return -1; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 020d2b6049d35072600f942929136df410f21b83..370103c222b5d4a34b3865f7727b3e5f5035b745 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -158,7 +158,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid); -void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow); +void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); @@ -240,7 +240,7 @@ bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); -void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9ec8bb2cfd9952dc48b510c54d856869b9ef2055..44ecf64419c005d4c900bc40b8103e8b8e534eb2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -150,19 +150,19 @@ typedef struct { int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback); -int tsdbClose(STsdb** pTsdb); -int32_t tsdbBegin(STsdb* pTsdb); -int32_t tsdbCommit(STsdb* pTsdb); -int32_t tsdbFinishCommit(STsdb* pTsdb); -int32_t tsdbRollbackCommit(STsdb* pTsdb); -int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); -int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); -int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); -int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, - SSubmitBlkRsp* pRsp); -int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback); +int tsdbClose(STsdb** pTsdb); +int32_t tsdbBegin(STsdb* pTsdb); +int32_t tsdbCommit(STsdb* pTsdb); +int32_t tsdbFinishCommit(STsdb* pTsdb); +int32_t tsdbRollbackCommit(STsdb* pTsdb); +int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); +int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); +int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); +int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, + SSubmitBlkRsp* pRsp); +int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); +int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); // tq int tqInit(); @@ -183,13 +183,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); -// int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); -// int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 094db9ebd099e8e218936d3bb1c71267eda98a37..9c377fe7f579756c0a3e374e57d337ad276a519a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -896,6 +896,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->startVer = ver; // expand executor + if (pTask->fillHistory) { + pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + } + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -911,9 +915,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); - if (pTask->fillHistory) { - pTask->taskStatus = TASK_STATUS__RECOVER_PREPARE; - } } else if (pTask->taskLevel == TASK_LEVEL__AGG) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -947,11 +948,90 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); - tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId, - pTask->selfChildId); + tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", TD_VID(pTq->pVnode), pTask->taskId, + pTask->selfChildId, pTask->taskLevel); return 0; } +int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamTaskCheckReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + tDecodeSStreamTaskCheckReq(&decoder, &req); + tDecoderClear(&decoder); + int32_t taskId = req.downstreamTaskId; + SStreamTaskCheckRsp rsp = { + .reqId = req.reqId, + .streamId = req.streamId, + .childId = req.childId, + .downstreamNodeId = req.downstreamNodeId, + .downstreamTaskId = req.downstreamTaskId, + .upstreamNodeId = req.upstreamNodeId, + .upstreamTaskId = req.upstreamTaskId, + }; + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + rsp.status = 1; + } else { + rsp.status = 0; + } + + tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", + rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + + SEncoder encoder; + int32_t code; + int32_t len; + tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); + if (code < 0) { + ASSERT(0); + } + void* buf = rpcMallocCont(sizeof(SMsgHead) + len); + ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeSStreamTaskCheckRsp(&encoder, &rsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = { + .code = 0, + .pCont = buf, + .contLen = sizeof(SMsgHead) + len, + .info = pMsg->info, + }; + + tmsgSendRsp(&rspMsg); + return 0; +} + +int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { + int32_t code; + SStreamTaskCheckRsp rsp; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp); + if (code < 0) { + tDecoderClear(&decoder); + return -1; + } + tDecoderClear(&decoder); + + tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", + rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, rsp.upstreamTaskId); + if (pTask == NULL) { + return -1; + } + + return streamProcessTaskCheckRsp(pTask, &rsp, version); +} + int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t code; #if 0 @@ -982,37 +1062,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg // 3.go through recover steps to fill history if (pTask->fillHistory) { - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - streamSetParamForRecover(pTask); - streamSourceRecoverPrepareStep1(pTask, version); - - SStreamRecoverStep1Req req; - streamBuildSourceRecover1Req(pTask, &req); - int32_t len = sizeof(SStreamRecoverStep1Req); - - void* serializedReq = rpcMallocCont(len); - if (serializedReq == NULL) { - return -1; - } - - memcpy(serializedReq, &req, len); - - SRpcMsg rpcMsg = { - .contLen = len, - .pCont = serializedReq, - .msgType = TDMT_VND_STREAM_RECOVER_STEP1, - }; - - if (tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &rpcMsg) < 0) { - /*ASSERT(0);*/ - } - - } else if (pTask->taskLevel == TASK_LEVEL__AGG) { - streamSetParamForRecover(pTask); - streamAggRecoverPrepare(pTask); - } else if (pTask->taskLevel == TASK_LEVEL__SINK) { - // do nothing - } + streamTaskCheckDownstream(pTask, version); } return 0; @@ -1268,7 +1318,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) { + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); continue; } @@ -1335,10 +1385,11 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t taskId = pRsp->taskId; + int32_t taskId = ntohl(pRsp->upstreamTaskId); SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + tqDebug("recv dispatch rsp, code: %x", pMsg->code); if (pTask) { - streamProcessDispatchRsp(pTask, pRsp); + streamProcessDispatchRsp(pTask, pRsp, pMsg->code); return 0; } else { return -1; @@ -1379,12 +1430,12 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { - STQ* pTq = pVnode->pTq; - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; +int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { + STQ* pTq = pVnode->pTq; + SMsgHead* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; SStreamDispatchReq req; SDecoder decoder; @@ -1407,16 +1458,45 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { streamProcessDispatchReq(pTask, &req, &rsp, false); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); - return; + return 0; } + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + FAIL: - if (pMsg->info.handle == NULL) return; + if (pMsg->info.handle == NULL) return -1; + + SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (pRspHead == NULL) { + SRpcMsg rsp = { + .code = TSDB_CODE_OUT_OF_MEMORY, + .info = pMsg->info, + }; + tqDebug("send dispatch error rsp, code: %x", code); + tmsgSendRsp(&rsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + return -1; + } + + pRspHead->vgId = htonl(req.upstreamNodeId); + SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead)); + pRsp->streamId = htobe64(req.streamId); + pRsp->upstreamTaskId = htonl(req.upstreamTaskId); + pRsp->upstreamNodeId = htonl(req.upstreamNodeId); + pRsp->downstreamNodeId = htonl(pVnode->config.vgId); + pRsp->downstreamTaskId = htonl(req.taskId); + pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; + SRpcMsg rsp = { .code = code, .info = pMsg->info, + .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), + .pCont = pRspHead, }; + tqDebug("send dispatch error rsp, code: %x", code); tmsgSendRsp(&rsp); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); + return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d6c77afa669f41605760007be82e59b39a5c659e..f1c4ab76d1ac55b68a05eac09b47be2d3da5da8c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -229,8 +229,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; /* TQ */ case TDMT_VND_TMQ_SUBSCRIBE: - if (tqProcessSubscribeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) { goto _err; } break; @@ -240,26 +239,22 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } break; case TDMT_VND_TMQ_COMMIT_OFFSET: - if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_VND_TMQ_ADD_CHECKINFO: - if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) { goto _err; } break; case TDMT_VND_TMQ_DEL_CHECKINFO: - if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessDelCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) { goto _err; } break; case TDMT_STREAM_TASK_DEPLOY: { - if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessTaskDeployReq(pVnode->pTq, version, pReq, len) < 0) { goto _err; } } break; @@ -273,6 +268,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp goto _err; } } break; + case TDMT_STREAM_TASK_CHECK_RSP: { + if (tqProcessStreamTaskCheckRsp(pVnode->pTq, version, pReq, len) < 0) { + goto _err; + } + } break; case TDMT_VND_ALTER_CONFIRM: vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp); break; @@ -388,10 +388,12 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); -#if 0 +#if 1 case TDMT_STREAM_TASK_DISPATCH: return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); #endif + case TDMT_STREAM_TASK_CHECK: + return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE: diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e54b889a7af00c62b11b043159913465daf77ef0..d596eab2f8ca57227cb9bd0ba5a950843e3ff0ef 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1884,11 +1884,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { - pTSInfo->cond.startVersion = -1; + pTSInfo->cond.startVersion = 0; pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; + qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion, + pTSInfo->cond.endVersion); } else { pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; + qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion, + pTSInfo->cond.endVersion); } /*resetTableScanInfo(pTSInfo, pWin);*/ @@ -1905,11 +1909,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pBlock != NULL) { calBlockTbName(&pInfo->tbnameCalSup, pBlock); updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); + qDebug("stream recover scan get block, rows %d", pBlock->info.rows); return pBlock; } pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - pTSInfo->cond.startVersion = 0; + tsdbReaderClose(pTSInfo->dataReader); + pTSInfo->dataReader = NULL; + + pTSInfo->cond.startVersion = -1; pTSInfo->cond.endVersion = -1; return NULL; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index a8f7184bb2f3f5ca9174805dd7d8df0aac390334..6a3bdb59c9277d392ae82a6bed15674672cf56c8 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -43,6 +43,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); +int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); + int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ee317d0751c90f4f63669a228ce35f640b5bde1b..b71562cf454f0e7be57ddccc00ae4fcdf410c88b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -135,8 +135,11 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); pCont->inputStatus = status; - pCont->streamId = pReq->streamId; - pCont->taskId = pReq->upstreamTaskId; + pCont->streamId = htobe64(pReq->streamId); + pCont->upstreamNodeId = htonl(pReq->upstreamNodeId); + pCont->upstreamTaskId = htonl(pReq->upstreamTaskId); + pCont->downstreamNodeId = htonl(pTask->nodeId); + pCont->downstreamTaskId = htonl(pTask->taskId); pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -203,10 +206,10 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S return 0; } -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); - qDebug("task %d receive dispatch rsp", pTask->taskId); + qDebug("task %d receive dispatch rsp, code: %x", pTask->taskId, code); if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d2876a22c654f972db77558c2f40a71b0677f221..ad342edfa03054d1caa043bbb77197f6cd1c738b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -210,6 +210,46 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis return 0; } +int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { + void* buf = NULL; + int32_t code = -1; + SRpcMsg msg = {0}; + + int32_t tlen; + tEncodeSize(tEncodeSStreamTaskCheckReq, pReq, tlen, code); + if (code < 0) { + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(nodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) { + goto FAIL; + } + tEncoderClear(&encoder); + + msg.contLen = tlen + sizeof(SMsgHead); + msg.pCont = buf; + msg.msgType = TDMT_STREAM_TASK_CHECK; + + qDebug("dispatch from task %d to task %d node %d: check msg", pTask->taskId, pReq->downstreamTaskId, nodeId); + + tmsgSendReq(pEpSet, &msg); + + return 0; +FAIL: + if (buf) rpcFreeCont(buf); + return code; +} + int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; @@ -243,7 +283,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov tmsgSendReq(pEpSet, &msg); - code = 0; + qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->taskId, pReq->taskId, vgId); + return 0; FAIL: if (buf) rpcFreeCont(buf); @@ -279,7 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; - qDebug("dispatch from task %d to task %d node %d", pTask->taskId, pReq->taskId, vgId); + qDebug("dispatch from task %d to task %d node %d: data msg", pTask->taskId, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 629333b4b4cb9890b5783e511902e31b1447bdb9..46fab536591f128282f22a7c7ef1a9adb2e441df 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -202,83 +202,83 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchCnt = 1; - void* data = NULL; + void* input = NULL; while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { qDebug("stream task exec over, queue empty, task: %d", pTask->taskId); break; } - if (data == NULL) { - data = qItem; + if (input == NULL) { + input = qItem; streamQueueProcessSuccess(pTask->inputQueue); if (pTask->taskLevel == TASK_LEVEL__SINK) { break; } } else { void* newRet; - if ((newRet = streamMergeQueueItem(data, qItem)) == NULL) { + if ((newRet = streamMergeQueueItem(input, qItem)) == NULL) { streamQueueProcessFail(pTask->inputQueue); break; } else { batchCnt++; - data = newRet; + input = newRet; streamQueueProcessSuccess(pTask->inputQueue); } } } if (pTask->taskStatus == TASK_STATUS__DROPPING) { - if (data) streamFreeQitem(data); + if (input) streamFreeQitem(input); return 0; } - if (data == NULL) { + if (input == NULL) { break; } if (pTask->taskLevel == TASK_LEVEL__SINK) { - ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, data); + ASSERT(((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, input); continue; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt); - streamTaskExecImpl(pTask, data, pRes); + streamTaskExecImpl(pTask, input, pRes); qDebug("stream task %d exec end", pTask->taskId); if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(data); + streamFreeQitem(input); return -1; } qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; + if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; - } else if (((SStreamQueueItem*)data)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data; + } else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)input; qRes->childId = pTask->selfChildId; qRes->sourceVer = pMerged->ver; } if (streamTaskOutput(pTask, qRes) < 0) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(data); + streamFreeQitem(input); taosFreeQitem(qRes); return -1; } } else { taosArrayDestroy(pRes); } - streamFreeQitem(data); + streamFreeQitem(input); } return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index adeb797721e8e24d7bdb2c89a1a8f0dae67b8948..2a2784afea67386ab2b75d8230f107f596f79efd 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -15,6 +15,148 @@ #include "streamInc.h" +int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { + qDebug("task %d at node %d launch recover", pTask->taskId, pTask->nodeId); + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_PREPARE); + streamSetParamForRecover(pTask); + streamSourceRecoverPrepareStep1(pTask, version); + + SStreamRecoverStep1Req req; + streamBuildSourceRecover1Req(pTask, &req); + int32_t len = sizeof(SStreamRecoverStep1Req); + + void* serializedReq = rpcMallocCont(len); + if (serializedReq == NULL) { + return -1; + } + + memcpy(serializedReq, &req, len); + + SRpcMsg rpcMsg = { + .contLen = len, + .pCont = serializedReq, + .msgType = TDMT_VND_STREAM_RECOVER_STEP1, + }; + + if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { + /*ASSERT(0);*/ + } + + } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + streamSetParamForRecover(pTask); + streamAggRecoverPrepare(pTask); + } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + } + return 0; +} + +// checkstatus +int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { + SStreamTaskCheckReq req = { + .streamId = pTask->streamId, + .upstreamTaskId = pTask->taskId, + .upstreamNodeId = pTask->nodeId, + .childId = pTask->selfChildId, + }; + // serialize + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + req.reqId = tGenIdPI64(); + req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; + req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + pTask->checkReqId = req.reqId; + + qDebug("task %d at node %d check downstream task %d at node %d", pTask->taskId, pTask->nodeId, req.downstreamTaskId, + req.downstreamNodeId); + streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t vgSz = taosArrayGetSize(vgInfo); + pTask->recoverTryingDownstream = vgSz; + pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); + + for (int32_t i = 0; i < vgSz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + req.reqId = tGenIdPI64(); + taosArrayPush(pTask->checkReqIds, &req.reqId); + req.downstreamNodeId = pVgInfo->vgId; + req.downstreamTaskId = pVgInfo->taskId; + qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->taskId, pTask->nodeId, + req.downstreamTaskId, req.downstreamNodeId); + streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + } + } else { + qDebug("task %d at node %d direct launch recover since no downstream", pTask->taskId, pTask->nodeId); + streamTaskLaunchRecover(pTask, version); + } + return 0; +} + +int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { + SStreamTaskCheckReq req = { + .reqId = pRsp->reqId, + .streamId = pRsp->streamId, + .upstreamTaskId = pRsp->upstreamTaskId, + .upstreamNodeId = pRsp->upstreamNodeId, + .downstreamTaskId = pRsp->downstreamTaskId, + .downstreamNodeId = pRsp->downstreamNodeId, + .childId = pRsp->childId, + }; + qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->taskId, pTask->nodeId, + req.downstreamTaskId, req.downstreamNodeId); + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t vgSz = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < vgSz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (pVgInfo->taskId == req.downstreamTaskId) { + streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); + } + } + } + return 0; +} + +int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) { + return atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL; +} + +int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { + qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); + if (pRsp->status == 1) { + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + bool found = false; + for (int32_t i = 0; i < taosArrayGetSize(pTask->checkReqIds); i++) { + int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); + if (reqId == pRsp->reqId) { + found = true; + break; + } + } + if (!found) return -1; + int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); + ASSERT(left >= 0); + if (left == 0) { + taosArrayDestroy(pTask->checkReqIds); + streamTaskLaunchRecover(pTask, version); + } + } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pRsp->reqId != pTask->checkReqId) return -1; + streamTaskLaunchRecover(pTask, version); + } else { + ASSERT(0); + } + } else { + streamRecheckOneDownstream(pTask, pRsp); + } + return 0; +} + // common int32_t streamSetParamForRecover(SStreamTask* pTask) { void* exec = pTask->exec.executor; @@ -86,10 +228,7 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { void* exec = pTask->exec.executor; - /*if (qStreamSetParamForRecover(exec) < 0) {*/ - /*return -1;*/ - /*}*/ - pTask->recoverWaitingChild = taosArrayGetSize(pTask->childEpInfo); + pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo); return 0; } @@ -107,7 +246,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { if (pTask->taskLevel == TASK_LEVEL__AGG) { - int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1); + int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1); ASSERT(left >= 0); if (left == 0) { streamAggChildrenRecoverFinish(pTask); @@ -116,95 +255,76 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { return 0; } -int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) { +int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} -int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} -#if 0 -int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq) { +int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; tEndDecode(pDecoder); return 0; } -int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) { +int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->reqTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->rspTaskId) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) { +int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->reqTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->rspTaskId) < 0) return -1; - if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; + if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; tEndDecode(pDecoder); return 0; } -int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq) { +int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } - -int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pReq) { +int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; tEndDecode(pDecoder); return 0; } -#endif int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) { if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1; @@ -248,308 +368,3 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh } return 0; } - -#if 0 -int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) { - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - return 0; -} - -int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq) { - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - return 0; -} - -int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp) { - if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; - int32_t sz = taosArrayGetSize(pRsp->checkpointVer); - if (tEncodeI32(pEncoder, sz) < 0) return -1; - for (int32_t i = 0; i < sz; i++) { - SStreamCheckpointInfo* pInfo = taosArrayGet(pRsp->checkpointVer, i); - if (tEncodeSStreamCheckpointInfo(pEncoder, pInfo) < 0) return -1; - } - return 0; -} - -int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp) { - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1; - int32_t sz; - if (tDecodeI32(pDecoder, &sz) < 0) return -1; - pRsp->checkpointVer = taosArrayInit(sz, sizeof(SStreamCheckpointInfo)); - if (pRsp->checkpointVer == NULL) return -1; - for (int32_t i = 0; i < sz; i++) { - SStreamCheckpointInfo info; - if (tDecodeSStreamCheckpointInfo(pDecoder, &info) < 0) return -1; - taosArrayPush(pRsp->checkpointVer, &info); - } - return 0; -} -#endif - -int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { -#if 0 - void* buf = NULL; - - ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); - - SStreamMultiVgCheckpointInfo checkpoint; - checkpoint.checkpointId = atomic_fetch_add_32(&pTask->nextCheckId, 1); - checkpoint.checkTs = taosGetTimestampMs(); - checkpoint.streamId = pTask->streamId; - checkpoint.taskId = pTask->taskId; - checkpoint.checkpointVer = pTask->checkpointInfo; - - int32_t len; - int32_t code; - tEncodeSize(tEncodeSStreamMultiVgCheckpointInfo, &checkpoint, len, code); - if (code < 0) { - return -1; - } - - buf = taosMemoryCalloc(1, len); - if (buf == NULL) { - return -1; - } - SEncoder encoder; - tEncoderInit(&encoder, buf, len); - tEncodeSStreamMultiVgCheckpointInfo(&encoder, &checkpoint); - tEncoderClear(&encoder); - - SStreamCheckpointKey key = { - .taskId = pTask->taskId, - .checkpointId = checkpoint.checkpointId, - }; - - if (tdbTbUpsert(pMeta->pStateDb, &key, sizeof(SStreamCheckpointKey), buf, len, &pMeta->txn) < 0) { - ASSERT(0); - goto FAIL; - } - - int32_t sz = taosArrayGetSize(pTask->checkpointInfo); - for (int32_t i = 0; i < sz; i++) { - SStreamCheckpointInfo* pCheck = taosArrayGet(pTask->checkpointInfo, i); - pCheck->stateSaveVer = pCheck->stateProcessedVer; - } - - taosMemoryFree(buf); - return 0; -FAIL: - if (buf) taosMemoryFree(buf); - return -1; -#endif - return 0; -} - -int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { -#if 0 - void* pVal = NULL; - int32_t vLen = 0; - if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, pVal, vLen); - SStreamMultiVgCheckpointInfo aggCheckpoint; - tDecodeSStreamMultiVgCheckpointInfo(&decoder, &aggCheckpoint); - tDecoderClear(&decoder); - - pTask->nextCheckId = aggCheckpoint.checkpointId + 1; - pTask->checkpointInfo = aggCheckpoint.checkpointVer; -#endif - return 0; -} - -int32_t streamSaveSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); - return streamSaveStateInfo(pMeta, pTask); -} - -int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); - return streamLoadStateInfo(pMeta, pTask); -} - -int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) { - ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); - // TODO save and copy state - - // save state info - if (streamSaveStateInfo(pMeta, pTask) < 0) { - return -1; - } - return 0; -} - -#if 0 -int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) { - int32_t taskId = pVgInfo->taskId; - int32_t nodeId = pVgInfo->vgId; - SStreamRecoverDownstreamReq req = { - .streamId = pTask->taskId, - .downstreamTaskId = taskId, - .taskId = pTask->taskId, - }; - int32_t tlen; - int32_t code; - tEncodeSize(tEncodeSStreamTaskRecoverReq, &req, tlen, code); - if (code < 0) { - return -1; - } - void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + tlen); - if (buf == NULL) { - return -1; - } - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if (tEncodeSStreamTaskRecoverReq(&encoder, &req) < 0) { - tEncoderClear(&encoder); - taosMemoryFree(buf); - return -1; - } - tEncoderClear(&encoder); - - ((SMsgHead*)buf)->vgId = htonl(nodeId); - SRpcMsg msg = { - .pCont = buf, .contLen = sizeof(SMsgHead) + tlen, - /*.msgType = */ - }; - tmsgSendReq(&pVgInfo->epSet, &msg); - - return 0; -} - -int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) { - // set self status to recover_phase1 - SStreamRecoverStatus* pRecover; - atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM); - pRecover = taosHashGet(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t)); - if (pRecover == NULL) { - pRecover = taosMemoryCalloc(1, sizeof(SStreamRecoverStatus)); - if (pRecover == NULL) { - return -1; - } - pRecover->info = taosArrayInit(0, sizeof(void*)); - if (pRecover->info == NULL) { - taosMemoryFree(pRecover); - return -1; - } - taosHashPut(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t), &pRecover, sizeof(void*)); - } - - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - pRecover->totReq = 1; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t numOfDownstream = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); - pRecover->totReq = numOfDownstream; - for (int32_t i = 0; i < numOfDownstream; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(pTask->shuffleDispatcher.dbInfo.pVgroupInfos, i); - streamFetchRecoverStatus(pTask, pVgInfo); - } - } else { - ASSERT(0); - } - return 0; -} -#endif - -#if 0 -int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) { - // if failed, set timer and retry - // if successful - int32_t taskId = pTask->taskId; - SStreamRecoverStatus* pRecover = taosHashGet(pMeta->pRecoverStatus, &taskId, sizeof(int32_t)); - if (pRecover == NULL) { - return -1; - } - - taosArrayPush(pRecover->info, &pRsp->checkpointVer); - - int32_t leftRsp = atomic_sub_fetch_32(&pRecover->waitingRspCnt, 1); - ASSERT(leftRsp >= 0); - - if (leftRsp == 0) { - ASSERT(taosArrayGetSize(pRecover->info) == pRecover->totReq); - - // srcNodeId -> SStreamCheckpointInfo* - SHashObj* pFinalChecks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (pFinalChecks == NULL) return -1; - - for (int32_t i = 0; i < pRecover->totReq; i++) { - SArray* pChecks = taosArrayGetP(pRecover->info, i); - int32_t sz = taosArrayGetSize(pChecks); - for (int32_t j = 0; j < sz; j++) { - SStreamCheckpointInfo* pOneCheck = taosArrayGet(pChecks, j); - SStreamCheckpointInfo* pCheck = taosHashGet(pFinalChecks, &pOneCheck->srcNodeId, sizeof(int32_t)); - if (pCheck == NULL) { - pCheck = taosMemoryCalloc(1, sizeof(SStreamCheckpointInfo)); - pCheck->srcNodeId = pOneCheck->srcNodeId; - pCheck->srcChildId = pOneCheck->srcChildId; - pCheck->stateProcessedVer = pOneCheck->stateProcessedVer; - taosHashPut(pFinalChecks, &pCheck->srcNodeId, sizeof(int32_t), &pCheck, sizeof(void*)); - } else { - pCheck->stateProcessedVer = TMIN(pCheck->stateProcessedVer, pOneCheck->stateProcessedVer); - } - } - } - // load local state - // - // recover - // - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); - if (streamPipelineExec(pTask, 10000, true) < 0) { - return -1; - } - } - taosHashCleanup(pFinalChecks); - taosHashRemove(pMeta->pRecoverStatus, &taskId, sizeof(int32_t)); - atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); - } - return 0; -} - -int32_t streamRecoverAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) { - ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); - // recover sink level - // after all sink level recovered - // choose suitable state to recover - return 0; -} - -int32_t streamSaveSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - // TODO: save and copy state - return 0; -} - -int32_t streamRecoverSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - // if totLevel == 3 - // fetch agg state - // recover from local state to agg state, not send msg - // recover from agg state to most recent log v1 - // enable input queue, set status recover_phase2 - // recover from v1 to queue msg v2, set status normal - - // if totLevel == 2 - // fetch sink state - // recover from local state to sink state v1, send msg - // enable input queue, set status recover_phase2 - // recover from v1 to queue msg v2, set status normal - return 0; -} - -int32_t streamRecoverTask(SStreamTask* pTask) { - // - return 0; -} -#endif diff --git a/source/util/src/terror.c b/source/util/src/terror.c index bbdf43c85e4c26cfdc0be35cf5cfc6f9268a4be4..7afb3c726060957ebb631510fed88ba2b8eef9e1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -633,6 +633,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") +// stream +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") + // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory") diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1a80aa0681f1eabf46d23be925e0acd90624267d..85845553330583cf7b65bb46caa9293653b2606b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -235,6 +235,8 @@ ./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/fillHistoryBasic1.sim +./test.sh -f tsim/stream/fillHistoryBasic2.sim +./test.sh -f tsim/stream/fillHistoryBasic3.sim ./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ./test.sh -f tsim/stream/distributeSession0.sim diff --git a/tests/script/tsim/stream/fillHistoryBasic1.sim b/tests/script/tsim/stream/fillHistoryBasic1.sim index 5bbaf1b7126b4baa29ccb5d1ce5f4535802716c9..4f4fa96bea41715175c7bef01d9ccddacb0608dd 100644 --- a/tests/script/tsim/stream/fillHistoryBasic1.sim +++ b/tests/script/tsim/stream/fillHistoryBasic1.sim @@ -26,7 +26,7 @@ sql insert into t1 values(1648791243003,4,2,3,3.1); sql insert into t1 values(1648791213004,4,2,3,4.1); -sleep 1000 +sleep 5000 sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; if $rows != 4 then @@ -466,9 +466,6 @@ if $data25 != 3 then endi - - - sql create database test2 vgroups 1; sql select * from information_schema.ins_databases; @@ -484,7 +481,7 @@ sql insert into t1 values(1648791213004,4,2,3,4.1); sql create stream stream2 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); -sleep 1000 +sleep 5000 sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt; if $rows != 4 then diff --git a/tests/script/tsim/stream/fillHistoryBasic2.sim b/tests/script/tsim/stream/fillHistoryBasic2.sim new file mode 100644 index 0000000000000000000000000000000000000000..3af198259d2c7bb4c5c39af3acd39d222f217d1c --- /dev/null +++ b/tests/script/tsim/stream/fillHistoryBasic2.sim @@ -0,0 +1,277 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create dnode $hostname2 port 7200 + +system sh/exec.sh -n dnode2 -s start + +print ===== step1 +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql select * from information_schema.ins_dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi + +print ===== step2 +sql drop stream if exists stream_t1; +sql drop database if exists test; +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); + +sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL); +sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL); + +sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL); +sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL); + +sql insert into ts1 values(1648791223002,2,2,3,1.1); +sql insert into ts1 values(1648791233003,3,2,3,2.1); +sql insert into ts2 values(1648791243004,4,2,43,73.1); +sql insert into ts1 values(1648791213002,24,22,23,4.1); +sql insert into ts1 values(1648791243005,4,20,3,3.1); +sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; +sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; +sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); +sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; +sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; +sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +sql insert into ts3 values(1648791223002,2,2,3,1.1); +sql insert into ts4 values(1648791233003,3,2,3,2.1); +sql insert into ts3 values(1648791243004,4,2,43,73.1); +sql insert into ts4 values(1648791213002,24,22,23,4.1); +sql insert into ts3 values(1648791243005,4,20,3,3.1); +sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; +sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; +sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); +sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; +sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; +sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +sql create stream stream_t1 trigger at_once fill_history 1 watermark 1d into streamtST1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s); + +sleep 1000 + +$loop_count = 0 +loop1: +sleep 300 +sql select * from streamtST1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 8 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +if $data03 != 52 then + print ======data03=$data03 + goto loop1 +endi + +if $data04 != 52 then + print ======data04=$data04 + goto loop1 +endi + +if $data05 != 13 then + print ======data05=$data05 + goto loop1 +endi + +# row 1 +if $data11 != 6 then + print =====data11=$data11 + goto loop1 +endi + +if $data12 != 6 then + print =====data12=$data12 + goto loop1 +endi + +if $data13 != 92 then + print ======$data13 + return -1 +endi + +if $data14 != 22 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 4 then + print =====data21=$data21 + goto loop1 +endi + +if $data22 != 4 then + print =====data22=$data22 + goto loop1 +endi + +if $data23 != 32 then + print ======$data23 + return -1 +endi + +if $data24 != 12 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 30 then + print =====data31=$data31 + goto loop1 +endi + +if $data32 != 30 then + print =====data32=$data32 + goto loop1 +endi + +if $data33 != 180 then + print ======$data33 + return -1 +endi + +if $data34 != 42 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s); + + +sql create database test1 vgroups 4; +sql use test1; +sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); + +sql insert into ts1 values(1648791211000,1,2,3); +sql insert into ts1 values(1648791222001,2,2,3); +sql insert into ts2 values(1648791211000,1,2,3); +sql insert into ts2 values(1648791222001,2,2,3); + +sql create stream stream_t2 trigger at_once fill_history 1 watermark 20s into streamtST1 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ; + +$loop_count = 0 +loop2: +sql select * from streamtST1; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop2 +endi + +#rows 1 +if $data11 != 2 then + print =====data11=$data11 + goto loop2 +endi + +#max,min selectivity +sql create database test3 vgroups 4; +sql use test3; +sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from st interval(10s) ; + +sql insert into ts1 values(1648791211000,1,2,3); +sleep 50 +sql insert into ts1 values(1648791222001,2,2,3); +sleep 50 +sql insert into ts2 values(1648791211000,1,2,3); +sleep 50 +sql insert into ts2 values(1648791222001,2,2,3); +sleep 50 + +$loop_count = 0 +loop3: +sql select * from streamtST3; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data02 != 1 then + print =====data02=$data02 + goto loop3 +endi + +# row 1 +if $data12 != 2 then + print =====data12=$data12 + goto loop3 +endi + +system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/fillHistoryBasic3.sim b/tests/script/tsim/stream/fillHistoryBasic3.sim new file mode 100644 index 0000000000000000000000000000000000000000..db8d6bc2d0ac7ae9b8c5d182b3d6eea63c02c7dd --- /dev/null +++ b/tests/script/tsim/stream/fillHistoryBasic3.sim @@ -0,0 +1,203 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create database test vgroups 1; +sql create database test2 vgroups 4; +sql use test2; +sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL); +sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL); + +sql create stream streams2 trigger at_once fill_history 1 into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a interval(10s); + +sleep 3000 + +$loop_count = 0 + +loop7: +sleep 50 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop7 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop7 +endi + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t2 values(1648791213000,1,2,3,1.0); + +$loop_count = 0 + +loop8: +sleep 50 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop8 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); + +$loop_count = 0 + +loop9: +sleep 50 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop9 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop9 +endi + +sql insert into t1 values(1648791213000,2,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); +sql insert into t1 values(1648791213002,1,2,3,1.0); +sql insert into t2 values(1648791213000,2,2,3,1.0); +sql insert into t2 values(1648791213001,2,2,3,1.0); +sql insert into t2 values(1648791213002,2,2,3,1.0); +sql insert into t2 values(1648791213002,1,2,3,1.0); + +$loop_count = 0 + +loop10: +sleep 50 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop10 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop10 +endi + +if $data11 != 4 thenloop4 + print =====data11=$data11 + goto loop10 +endi + +if $data12 != 2 then + print =====data12=$data12 + goto loop10 +endi + +sql insert into t1 values(1648791223000,1,2,3,1.0); +sql insert into t1 values(1648791223001,1,2,3,1.0); +sql insert into t1 values(1648791223002,3,2,3,1.0); +sql insert into t1 values(1648791223003,3,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); +sql insert into t2 values(1648791223000,1,2,3,1.0); +sql insert into t2 values(1648791223001,1,2,3,1.0); +sql insert into t2 values(1648791223002,3,2,3,1.0); +sql insert into t2 values(1648791223003,3,2,3,1.0); +sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0); + +$loop_count = 0 + +loop11: +sleep 50 +sql select * from test.streamt2 order by c1, c2, c3; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop11 +endi + +if $data02 != 2 then + print =====data02=$data02 + goto loop11 +endi + +if $data11 != 4 then + print =====data11=$data11 + goto loop11 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop11 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop11 +endi + +if $data22 != 2 then + print =====data22=$data22 + goto loop11 +endi + +if $data31 != 2 then + print =====data31=$data31 + goto loop11 +endi + +if $data32 != 3 then + print =====data32=$data32 + goto loop11 +endi + +if $data41 != 4 then + print =====data41=$data41 + goto loop11 +endi + +if $data42 != 1 then + print =====data42=$data42 + goto loop11 +endi