diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b35dc71ed974e656b94b30562ee895a9f81fcd20..033e4fc5b89a189500b3105bb52c5c990b8d15e0 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -72,7 +72,7 @@ typedef struct { typedef struct { int64_t suid; - char* qmsg; // SubPlanToString + char* qmsg; // SubPlanToString SNode* node; } STqExecTb; @@ -98,18 +98,21 @@ typedef enum tq_handle_status { } tq_handle_status; typedef struct { - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - int64_t consumerId; - int32_t epoch; - int8_t fetchMeta; - int64_t snapshotVer; - SWalReader* pWalReader; - SWalRef* pRef; -// STqPushHandle pushHandle; // push - STqExecHandle execHandle; // exec - SRpcMsg* msg; - tq_handle_status status; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int64_t consumerId; + int32_t epoch; + int8_t fetchMeta; + int64_t snapshotVer; + SWalReader* pWalReader; + SWalRef* pRef; + // STqPushHandle pushHandle; // push + STqExecHandle execHandle; // exec + SRpcMsg* msg; + tq_handle_status status; } STqHandle; +typedef struct { + int64_t snapshotVer; +} SStreamHandle; struct STQ { SVnode* pVnode; diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 97dabe8f264217e03f60f6a66bee492f2bc6f9c7..8801b450f949d071472484ab185e3c3bce8d30b9 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -88,17 +88,20 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { goto _exit; } + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } - // tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - // tDecodeSTqHandle(&decoder, &handle); - // tDecoderClear(&decoder); - - if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { - tdbTbcMoveToNext(pReader->pCur); - break; - } else { - tdbTbcMoveToNext(pReader->pCur); + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + code = tDecodeStreamTask(&decoder, pTask); + if (code < 0) { + tDecoderClear(&decoder); + taosMemoryFree(pTask); + goto _err; } + tDecoderClear(&decoder); } *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); @@ -115,13 +118,12 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), handle.snapshotVer, handle.subKey, vLen); + return code; _exit: return code; - _err: tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); return code; - return 0; } // STqSnapWriter ========================================