diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c4dd335aa4f85f47b617464cd7c1712a7470abe4..a2f84c6ba37bcb22aa56e35d44859df7dba8f545 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -244,11 +244,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqOffsetVal fetchOffsetNew; // 1.find handle - char buf[80]; - tFormatOffset(buf, 80, &reqOffset); - tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), buf); - STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/ if (pHandle == NULL) { @@ -270,6 +265,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); } + char buf[80]; + tFormatOffset(buf, 80, &reqOffset); + tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, + pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); + // 2.reset offset if needed if (reqOffset.type > 0) { fetchOffsetNew = reqOffset; @@ -279,7 +279,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffsetNew = pOffset->val; char formatBuf[80]; tFormatOffset(formatBuf, 80, &fetchOffsetNew); - tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); + tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %s", consumerId, pHandle->subKey, formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -295,8 +295,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal)); } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { - tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s, reset none failed", consumerId, - TD_VID(pTq->pVnode), pReq->subKey); + tqError("tmq poll: subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed", + pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; return -1; } @@ -307,14 +307,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); - if (!pHandle->fetchMeta && fetchOffsetNew.type == TMQ_OFFSET__LOG) { + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { + fetchOffsetNew.version++; if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) { ASSERT(0); code = -1; goto OVER; } if (dataRsp.blockNum == 0) { - // add to async task + // TODO add to async task + /*dataRsp.rspOffset.version--;*/ } if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; @@ -322,7 +324,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { goto OVER; } - if (pHandle->fetchMeta && fetchOffsetNew.type == TMQ_OFFSET__LOG) { + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { int64_t fetchVer = fetchOffsetNew.version + 1; SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { @@ -334,8 +336,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { while (1) { consumerEpoch = atomic_load_32(&pHandle->epoch); if (consumerEpoch > reqEpoch) { - tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d", - consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); + tqWarn( + "tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %ld, found new consumer epoch %d, discard req " + "epoch %d", + consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); break; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 504b040ed91d1c1aeb5686d76f60fcde3aca9d19..2d51073abf6a9c60a94b60d347459eba2a464239 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -135,6 +135,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { int32_t code = tqRetrieveDataBlock(&ret->data, pReader); if (code != 0 || ret->data.info.rows == 0) { ASSERT(0); + continue; #if 0 if (fromProcessedMsg) { ret->fetchType = FETCH_TYPE__NONE; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 1ccb9cf15299868fe624ab7e74adffa561bc11e5..8960703990ca2b35b7dd695b4eb1d039d5c95687 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -150,6 +150,7 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { SWal *pWal = pRead->pWal; if (ver == pRead->curVersion) { + wDebug("wal version %ld match, no need to reset", ver); return 0; } if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { @@ -177,6 +178,8 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { return -1; } + wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver); + pRead->curVersion = ver; return 0; @@ -249,6 +252,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { } pRead->curVersion = ver + 1; + wDebug("version advance to %ld, fetch body", pRead->curVersion); return 0; } @@ -265,6 +269,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { } pRead->curVersion++; + wDebug("version advance to %ld, skip fetch", pRead->curVersion); return 0; }