提交 eda06081 编写于 作者: H Haojun Liao

enh(stream): refactor and serialize the attributes of history tasks.

上级 d9c364d6
...@@ -54,6 +54,11 @@ typedef struct SSessionKey { ...@@ -54,6 +54,11 @@ typedef struct SSessionKey {
uint64_t groupId; uint64_t groupId;
} SSessionKey; } SSessionKey;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;
static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) { static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2; SWinKey* pWin2 = (SWinKey*)pKey2;
......
...@@ -276,6 +276,11 @@ typedef struct SStreamStatus { ...@@ -276,6 +276,11 @@ typedef struct SStreamStatus {
int8_t keepTaskStatus; int8_t keepTaskStatus;
} SStreamStatus; } SStreamStatus;
typedef struct SHistoryDataRange {
SVersionRange range;
STimeWindow window;
} SHistoryDataRange;
struct SStreamTask { struct SStreamTask {
SStreamId id; SStreamId id;
int32_t totalLevel; int32_t totalLevel;
...@@ -290,9 +295,9 @@ struct SStreamTask { ...@@ -290,9 +295,9 @@ struct SStreamTask {
STaskExec exec; STaskExec exec;
int8_t fillHistory; // fill history int8_t fillHistory; // fill history
int64_t ekey; // end ts key SHistoryDataRange dataRange;
int64_t endVer; // end version
SStreamId historyTaskId; SStreamId historyTaskId;
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId; int32_t nextCheckId;
......
...@@ -476,7 +476,7 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea ...@@ -476,7 +476,7 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
} }
} }
// persistent stream task for history data // persistent stream task for already stored ts data
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
level = taosArrayGetSize(pStream->pHTasksList); level = taosArrayGetSize(pStream->pHTasksList);
...@@ -493,7 +493,6 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea ...@@ -493,7 +493,6 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
} }
} }
return 0; return 0;
} }
...@@ -639,6 +638,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -639,6 +638,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pReq->head.vgId = htonl(pTask->nodeId); pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
STransAction action = {0}; STransAction action = {0};
......
...@@ -64,7 +64,6 @@ typedef struct STsdbReadSnap STsdbReadSnap; ...@@ -64,7 +64,6 @@ typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo; typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo; typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol; typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter; typedef struct SLDataIter SLDataIter;
typedef struct SDiskCol SDiskCol; typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData; typedef struct SDiskData SDiskData;
...@@ -376,11 +375,6 @@ struct TSDBKEY { ...@@ -376,11 +375,6 @@ struct TSDBKEY {
TSKEY ts; TSKEY ts;
}; };
struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
};
typedef struct SMemSkipListNode SMemSkipListNode; typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode { struct SMemSkipListNode {
int8_t level; int8_t level;
......
...@@ -187,6 +187,13 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -187,6 +187,13 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1; if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1; if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
int32_t epSz = taosArrayGetSize(pTask->childEpInfo); int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1; if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) { for (int32_t i = 0; i < epSz; i++) {
...@@ -240,6 +247,13 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -240,6 +247,13 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
int32_t epSz; int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1; if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*)); pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册