diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ff2e419c7528c18be20e6677cd46b7e4695486fc..b50367af038dd45c4e021a05ac5b97634b39aa7d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -291,6 +291,109 @@ typedef struct { SSchema* pSchema; } SSchemaWrapper; +static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { + SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); + if (pSW == NULL) return pSW; + pSW->nCols = pSchemaWrapper->nCols; + pSW->sver = pSchemaWrapper->sver; + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + taosMemoryFree(pSW); + return NULL; + } + memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); + return pSW; +} + +static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { + taosMemoryFree(pSchemaWrapper->pSchema); + taosMemoryFree(pSchemaWrapper); +} + +static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pSchema->type); + tlen += taosEncodeFixedI8(buf, pSchema->flags); + tlen += taosEncodeFixedI32(buf, pSchema->bytes); + tlen += taosEncodeFixedI16(buf, pSchema->colId); + tlen += taosEncodeString(buf, pSchema->name); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) { + buf = taosDecodeFixedI8(buf, &pSchema->type); + buf = taosDecodeFixedI8(buf, &pSchema->flags); + buf = taosDecodeFixedI32(buf, &pSchema->bytes); + buf = taosDecodeFixedI16(buf, &pSchema->colId); + buf = taosDecodeStringTo(buf, pSchema->name); + return (void*)buf; +} + +static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) { + if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; + if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1; + if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1; + if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1; + if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1; + return 0; +} + +static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) { + if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; + if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1; + if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1; + return 0; +} + +static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { + int32_t tlen = 0; + tlen += taosEncodeVariantI32(buf, pSW->nCols); + tlen += taosEncodeVariantI32(buf, pSW->sver); + for (int32_t i = 0; i < pSW->nCols; i++) { + tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { + buf = taosDecodeVariantI32(buf, &pSW->nCols); + buf = taosDecodeVariantI32(buf, &pSW->sver); + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } + + for (int32_t i = 0; i < pSW->nCols; i++) { + buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + } + return (void*)buf; +} + +static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { + if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; + if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; + for (int32_t i = 0; i < pSW->nCols; i++) { + if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; + } + + return 0; +} + +static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { + if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; + + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) return -1; + for (int32_t i = 0; i < pSW->nCols; i++) { + if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; + } + + return 0; +} + STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols); typedef struct { @@ -1873,7 +1976,7 @@ typedef struct SMqHbTopicInfo { int32_t epoch; int64_t topicUid; char name[TSDB_TOPIC_FNAME_LEN]; - SArray* pVgInfo; + SArray* pVgInfo; // SArray } SMqHbTopicInfo; static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { @@ -1992,49 +2095,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) return (void*)buf; } -typedef struct { - int8_t reserved; -} SMqRebVgRsp; - -typedef struct { - int64_t leftForVer; - int32_t vgId; - int32_t epoch; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - char* sql; - char* physicalPlan; - char* qmsg; -} SMqSetCVgReq; - -static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pReq->leftForVer); - tlen += taosEncodeFixedI32(buf, pReq->vgId); - tlen += taosEncodeFixedI32(buf, pReq->epoch); - tlen += taosEncodeFixedI64(buf, pReq->consumerId); - tlen += taosEncodeString(buf, pReq->topicName); - tlen += taosEncodeString(buf, pReq->cgroup); - tlen += taosEncodeString(buf, pReq->sql); - tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += taosEncodeString(buf, pReq->qmsg); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { - buf = taosDecodeFixedI64(buf, &pReq->leftForVer); - buf = taosDecodeFixedI32(buf, &pReq->vgId); - buf = taosDecodeFixedI32(buf, &pReq->epoch); - buf = taosDecodeFixedI64(buf, &pReq->consumerId); - buf = taosDecodeStringTo(buf, pReq->topicName); - buf = taosDecodeStringTo(buf, pReq->cgroup); - buf = taosDecodeString(buf, &pReq->sql); - buf = taosDecodeString(buf, &pReq->physicalPlan); - buf = taosDecodeString(buf, &pReq->qmsg); - return buf; -} - typedef struct { int32_t vgId; int64_t offset; @@ -2056,109 +2116,6 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset); int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq); -static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { - SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); - if (pSW == NULL) return pSW; - pSW->nCols = pSchemaWrapper->nCols; - pSW->sver = pSchemaWrapper->sver; - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - taosMemoryFree(pSW); - return NULL; - } - memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); - return pSW; -} - -static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { - taosMemoryFree(pSchemaWrapper->pSchema); - taosMemoryFree(pSchemaWrapper); -} - -static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { - int32_t tlen = 0; - tlen += taosEncodeFixedI8(buf, pSchema->type); - tlen += taosEncodeFixedI8(buf, pSchema->flags); - tlen += taosEncodeFixedI32(buf, pSchema->bytes); - tlen += taosEncodeFixedI16(buf, pSchema->colId); - tlen += taosEncodeString(buf, pSchema->name); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) { - buf = taosDecodeFixedI8(buf, &pSchema->type); - buf = taosDecodeFixedI8(buf, &pSchema->flags); - buf = taosDecodeFixedI32(buf, &pSchema->bytes); - buf = taosDecodeFixedI16(buf, &pSchema->colId); - buf = taosDecodeStringTo(buf, pSchema->name); - return (void*)buf; -} - -static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) { - if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; - if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1; - if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1; - if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1; - if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1; - return 0; -} - -static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) { - if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; - if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1; - if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1; - return 0; -} - -static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { - int32_t tlen = 0; - tlen += taosEncodeVariantI32(buf, pSW->nCols); - tlen += taosEncodeVariantI32(buf, pSW->sver); - for (int32_t i = 0; i < pSW->nCols; i++) { - tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { - buf = taosDecodeVariantI32(buf, &pSW->nCols); - buf = taosDecodeVariantI32(buf, &pSW->sver); - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - return NULL; - } - - for (int32_t i = 0; i < pSW->nCols; i++) { - buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); - } - return (void*)buf; -} - -static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { - if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; - if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; - for (int32_t i = 0; i < pSW->nCols; i++) { - if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; - } - - return 0; -} - -static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { - if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; - if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; - - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) return -1; - for (int32_t i = 0; i < pSW->nCols; i++) { - if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; - } - - return 0; -} - typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; @@ -2427,6 +2384,21 @@ typedef struct { SEpSet epSet; } SMqSubVgEp; +static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pVgEp->vgId); + tlen += taosEncodeFixedI64(buf, pVgEp->offset); + tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { + buf = taosDecodeFixedI32(buf, &pVgEp->vgId); + buf = taosDecodeFixedI64(buf, &pVgEp->offset); + buf = taosDecodeSEpSet(buf, &pVgEp->epSet); + return buf; +} + typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; int8_t isSchemaAdaptive; @@ -2434,6 +2406,43 @@ typedef struct { SSchemaWrapper schema; } SMqSubTopicEp; +static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pTopicEp->topic); + tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive); + int32_t sz = taosArrayGetSize(pTopicEp->vgs); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); + tlen += tEncodeSMqSubVgEp(buf, pVgEp); + } + tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { + buf = taosDecodeStringTo(buf, pTopicEp->topic); + buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); + if (pTopicEp->vgs == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp vgEp; + buf = tDecodeSMqSubVgEp(buf, &vgEp); + taosArrayPush(pTopicEp->vgs, &vgEp); + } + buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); + return buf; +} + +static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { + // taosMemoryFree(pSubTopicEp->schema.pSchema); + taosArrayDestroy(pSubTopicEp->vgs); +} + typedef struct { SMqRspHead head; int64_t reqOffset; @@ -2512,58 +2521,6 @@ typedef struct { SArray* topics; // SArray } SMqAskEpRsp; -static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - // taosMemoryFree(pSubTopicEp->schema.pSchema); - taosArrayDestroy(pSubTopicEp->vgs); -} - -static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pVgEp->vgId); - tlen += taosEncodeFixedI64(buf, pVgEp->offset); - tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { - buf = taosDecodeFixedI32(buf, &pVgEp->vgId); - buf = taosDecodeFixedI64(buf, &pVgEp->offset); - buf = taosDecodeSEpSet(buf, &pVgEp->epSet); - return buf; -} - -static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pTopicEp->topic); - tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive); - int32_t sz = taosArrayGetSize(pTopicEp->vgs); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); - tlen += tEncodeSMqSubVgEp(buf, pVgEp); - } - tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { - buf = taosDecodeStringTo(buf, pTopicEp->topic); - buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); - if (pTopicEp->vgs == NULL) { - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqSubVgEp vgEp; - buf = tDecodeSMqSubVgEp(buf, &vgEp); - taosArrayPush(pTopicEp->vgs, &vgEp); - } - buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); - return buf; -} - static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) { int32_t tlen = 0; // tlen += taosEncodeString(buf, pRsp->cgroup); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 93a950466b9164f4aa342a86976ce9e38b20fa1c..e541c214deba8e9b9ad3cc4e95cc2d2224f3c5a3 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -208,6 +208,7 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); +int64_t walGetCommittedVer(SWal *); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2751e0752eee6d77fce25b90e5c281614e43da02..5eb6866387a8062d63a1c91c3a11b5751b3f94d8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -452,6 +452,7 @@ typedef struct { int8_t withSchema; int8_t withTag; SRWLatch lock; + int32_t consumerCnt; int32_t sqlLen; int32_t astLen; char* sql; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2a81f28eddef99b673a061780ed8a7fcbcf7fac6..5ad4863322e11a4f1abab2f12ece752ea569b431 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -787,6 +787,8 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock } } + // do not show for cleared subscription +#if 0 int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); @@ -829,6 +831,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock numOfRows++; } +#endif taosRUnLockLatch(&pSub->lock); sdbRelease(pSdb, pSub); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b62de0e06e719c766d9e88cdb60639da3b15dcfc..41d4d5f40651ebe85917550c17f5c720569dbc6e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -89,6 +89,8 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER); SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER); + + SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); @@ -152,6 +154,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER); SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char)); if (pTopic->sql == NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6ca60945cdd05aba42830fa5b3bbd77c04aa4680..4b9551b2500e036331147e436e99a9fb8b08d613 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -401,7 +401,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = walGetFirstVer(pTq->pWal); } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { - fetchOffset = walGetLastVer(pTq->pWal); + fetchOffset = walGetCommittedVer(pTq->pWal); } else { fetchOffset = pReq->currentOffset + 1; } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 3e6663f4647eb059e1391953892eebbaf64c7a73..9aa848a7bb2d514533e60195a49aaa9c3539e88a 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -25,6 +25,8 @@ int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotV int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; } +int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; } + static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index e14515286e6efd7600a9a9018a29545e0df2e6be..0cfe75bf333a0abaf0684fd05ce8a20adf811170 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -143,7 +143,11 @@ void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capa int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { int32_t code; + // TODO: valid ver + if (ver > pRead->pWal->vers.commitVer) { + return -1; + } if (pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver);