diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2f93f8c3e3594e05dca9474c1033b6a1a25bf6f0..8a1e95a6612894bd8cb27b4443e2bdbf16be72c1 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,6 +54,11 @@ typedef struct SSessionKey { uint64_t groupId; } SSessionKey; +typedef struct SVersionRange { + uint64_t minVer; + uint64_t maxVer; +} SVersionRange; + static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) { SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin2 = (SWinKey*)pKey2; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6d54790b2f49dc3d24b6abbb9f713fc4407e62a1..2e2663f87a632857f098ab81eba0fefbe6519750 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -276,6 +276,11 @@ typedef struct SStreamStatus { int8_t keepTaskStatus; } SStreamStatus; +typedef struct SHistoryDataRange { + SVersionRange range; + STimeWindow window; +} SHistoryDataRange; + struct SStreamTask { SStreamId id; int32_t totalLevel; @@ -290,9 +295,9 @@ struct SStreamTask { STaskExec exec; int8_t fillHistory; // fill history - int64_t ekey; // end ts key - int64_t endVer; // end version - SStreamId historyTaskId; + SHistoryDataRange dataRange; + SStreamId historyTaskId; + // children info SArray* childEpInfo; // SArray int32_t nextCheckId; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 81b985f51541574b43b76563091eb718900169c4..1ce4ce2b7e5f892342db07131b9be3817a24a6c8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -476,7 +476,7 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea } } - // persistent stream task for history data + // persistent stream task for already stored ts data if (pStream->conf.fillHistory) { level = taosArrayGetSize(pStream->pHTasksList); @@ -493,7 +493,6 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea } } - return 0; } @@ -639,6 +638,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pReq->head.vgId = htonl(pTask->nodeId); pReq->taskId = pTask->id.taskId; STransAction action = {0}; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index faf550ab75a81f284ca414985f87aa8db54e8cd3..f9dd80a05615b3e15f4728dd8b933d93649bc425 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -64,7 +64,6 @@ typedef struct STsdbReadSnap STsdbReadSnap; typedef struct SBlockInfo SBlockInfo; typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; -typedef struct SVersionRange SVersionRange; typedef struct SLDataIter SLDataIter; typedef struct SDiskCol SDiskCol; typedef struct SDiskData SDiskData; @@ -376,11 +375,6 @@ struct TSDBKEY { TSKEY ts; }; -struct SVersionRange { - uint64_t minVer; - uint64_t maxVer; -}; - typedef struct SMemSkipListNode SMemSkipListNode; struct SMemSkipListNode { int8_t level; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6d8ec11f44eaa7fa046d4d7f9589eb7c72546395..4f883b76e4e225e77c5fe685645a849643d6345b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -187,6 +187,13 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1; if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; + if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1; + if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; + if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; + int32_t epSz = taosArrayGetSize(pTask->childEpInfo); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { @@ -240,6 +247,13 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; + if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1; + if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; + if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; + int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));