提交 9628a9f7 编写于 作者: wmmhello's avatar wmmhello

fix:open info log in tmq & ignore wal apply ver when read wal

上级 bb86f5c5
...@@ -712,7 +712,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us ...@@ -712,7 +712,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
end: end:
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
if(pParamSet->callbackFn != NULL) { if(pCommitFp != NULL) {
pCommitFp(tmq, code, userParam); pCommitFp(tmq, code, userParam);
} }
return; return;
......
...@@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){ ...@@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
bool mndRebTryStart() { bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mDebug("tq timer, rebalance counter old val:%d", old); mInfo("tq timer, rebalance counter old val:%d", old);
return old == 0; return old == 0;
} }
...@@ -116,7 +116,7 @@ void mndRebCntDec() { ...@@ -116,7 +116,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1; int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) { if (oldVal == val) {
mDebug("rebalance trans end, rebalance counter:%d", newVal); mInfo("rebalance trans end, rebalance counter:%d", newVal);
break; break;
} }
} }
...@@ -281,7 +281,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -281,7 +281,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
// rebalance cannot be parallel // rebalance cannot be parallel
if (!mndRebTryStart()) { if (!mndRebTryStart()) {
mDebug("mq rebalance already in progress, do nothing"); mInfo("mq rebalance already in progress, do nothing");
return 0; return 0;
} }
...@@ -312,7 +312,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -312,7 +312,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
hbStatus); hbStatus);
...@@ -416,7 +416,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { ...@@ -416,7 +416,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
for(int i = 0; i < taosArrayGetSize(req.topics); i++){ for(int i = 0; i < taosArrayGetSize(req.topics); i++){
TopicOffsetRows* data = taosArrayGet(req.topics, i); TopicOffsetRows* data = taosArrayGet(req.topics, i);
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); mInfo("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){ if(pSub == NULL){
...@@ -1104,13 +1104,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -1104,13 +1104,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
} }
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {
mDebug("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId); mInfo("showing consumer:0x%" PRIx64 " no assigned topic, skip", pConsumer->consumerId);
sdbRelease(pSdb, pConsumer); sdbRelease(pSdb, pConsumer);
continue; continue;
} }
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
mDebug("showing consumer:0x%" PRIx64, pConsumer->consumerId); mInfo("showing consumer:0x%" PRIx64, pConsumer->consumerId);
int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics); int32_t topicSz = taosArrayGetSize(pConsumer->assignedTopics);
bool hasTopic = true; bool hasTopic = true;
......
...@@ -1207,7 +1207,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock ...@@ -1207,7 +1207,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
int32_t numOfRows = 0; int32_t numOfRows = 0;
SMqSubscribeObj *pSub = NULL; SMqSubscribeObj *pSub = NULL;
mDebug("mnd show subscriptions begin"); mInfo("mnd show subscriptions begin");
while (numOfRows < rowsCapacity) { while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
...@@ -1247,7 +1247,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock ...@@ -1247,7 +1247,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
} }
mDebug("mnd end show subscriptions"); mInfo("mnd end show subscriptions");
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
return numOfRows; return numOfRows;
......
...@@ -703,7 +703,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -703,7 +703,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
int32_t code = 0; int32_t code = 0;
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
...@@ -784,7 +784,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -784,7 +784,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
return -1; return -1;
} }
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId); req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = NULL; STqHandle* pHandle = NULL;
......
...@@ -70,17 +70,18 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -70,17 +70,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pReader->curVersion; int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pReader->pWal); int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal); // int64_t appliedVer = walGetAppliedVer(pReader->pWal);
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] // if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); // wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
} // }
int64_t endVer = TMIN(appliedVer, committedVer); // int64_t endVer = TMIN(appliedVer, committedVer);
int64_t endVer = committedVer;
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64", end index:%" PRId64, ", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer);
if (fetchVer > endVer){ if (fetchVer > endVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
...@@ -370,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -370,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
pRead->pWal->vers.appliedVer); pRead->pWal->vers.appliedVer);
// TODO: valid ver // TODO: valid ver
if (ver > pRead->pWal->vers.appliedVer) { // if (ver > pRead->pWal->vers.appliedVer) {
return -1; // return -1;
} // }
if (pRead->curVersion != ver) { if (pRead->curVersion != ver) {
code = walReaderSeekVer(pRead, ver); code = walReaderSeekVer(pRead, ver);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册