提交 7a655895 编写于 作者: L Liu Jicong

feat(wal): add api to check log existance

上级 4f107908
...@@ -210,6 +210,8 @@ void walCloseRef(SWalRef *); ...@@ -210,6 +210,8 @@ void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver); int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *); int32_t walUnrefVer(SWal *);
bool walLogExist(SWal *, int64_t ver);
// lifecycle check // lifecycle check
bool walIsEmpty(SWal *); bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *); int64_t walGetFirstVer(SWal *);
......
...@@ -1609,8 +1609,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1609,8 +1609,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
char offsetFormatBuf[50]; char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 50, &pVg->currentOffsetNew); tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId, tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
......
...@@ -276,7 +276,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -276,7 +276,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot) { if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (!pHandle->fetchMeta) { if (!pHandle->fetchMeta) {
tqOffsetResetToData(&fetchOffsetNew, 0, 0); tqOffsetResetToData(&fetchOffsetNew, 0, 0);
} else { } else {
...@@ -375,10 +375,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -375,10 +375,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosMemoryFree(pHeadWithCkSum); taosMemoryFree(pHeadWithCkSum);
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
// 1. set uid and ts
// 2. get data (rebuild reader if needed)
// 3. get new uid and ts
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts); tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0); ASSERT(0);
......
...@@ -19,6 +19,10 @@ ...@@ -19,6 +19,10 @@
#include "tref.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
}
bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; } bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; }
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
......
...@@ -36,7 +36,11 @@ ...@@ -36,7 +36,11 @@
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32) #define MAX_VGROUP_CNT (32)
typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID; typedef enum {
NOTIFY_CMD_START_CONSUM,
NOTIFY_CMD_START_COMMIT,
NOTIFY_CMD_ID_BUTT,
} NOTIFY_CMD_ID;
typedef struct { typedef struct {
TdThread thread; TdThread thread;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册