未验证 提交 91583bb2 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15168 from taosdata/feature/stream

feat(wal): log applied ver
......@@ -64,6 +64,7 @@ typedef struct {
int64_t verInSnapshotting;
int64_t snapshotVer;
int64_t commitVer;
int64_t appliedVer;
int64_t lastVer;
} SWalVer;
......@@ -172,6 +173,9 @@ int32_t walRollback(SWal *, int64_t ver);
int32_t walBeginSnapshot(SWal *, int64_t ver);
int32_t walEndSnapshot(SWal *);
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// for tq
int32_t walApplyVer(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);
// read
......@@ -186,7 +190,6 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct {
int64_t refId;
int64_t ver;
......@@ -206,6 +209,7 @@ int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
int64_t walGetCommittedVer(SWal *);
int64_t walGetAppliedVer(SWal *);
#ifdef __cplusplus
}
......
......@@ -265,6 +265,10 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int64_t consumerId = be64toh(pReq->consumerId);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
atomic_store_32(&pConsumer->hbStatus, 0);
......
......@@ -237,6 +237,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
walApplyVer(pTq->pVnode->pWal, ver);
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
......@@ -253,4 +255,3 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
return 0;
}
......@@ -878,6 +878,8 @@ _exit:
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
}
vDebug("successful submit in vg %d version %ld", pVnode->config.vgId, version);
return 0;
}
......
......@@ -33,6 +33,8 @@ int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}
......
......@@ -66,9 +66,15 @@ void walCloseReader(SWalReader *pRead) {
}
int32_t walNextValidMsg(SWalReader *pRead) {
wDebug("vgId:%d wal start to fetch", pRead->pWal->cfg.vgId);
int64_t fetchVer = pRead->curVersion;
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
int64_t lastVer = walGetLastVer(pRead->pWal);
int64_t committedVer = walGetCommittedVer(pRead->pWal);
int64_t appliedVer = walGetAppliedVer(pRead->pWal);
int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer;
endVer = TMIN(appliedVer, endVer);
wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld",
pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
while (fetchVer <= endVer) {
if (walFetchHeadNew(pRead, fetchVer) < 0) {
return -1;
......
......@@ -64,6 +64,12 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walApplyVer(SWal *pWal, int64_t ver) {
// TODO: error check
pWal->vers.appliedVer = ver;
return 0;
}
int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
......
......@@ -225,7 +225,7 @@ class TDTestCase:
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
# self.tmqCase2() TD-17267
# self.tmqCase2() # TD-17267
def stop(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册