提交 a9e75c87 编写于 作者: L Liu Jicong

refactor(stream): internal refactor

上级 938868e3
...@@ -244,11 +244,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -244,11 +244,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
// 1.find handle // 1.find handle
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), buf);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/ /*ASSERT(pHandle);*/
if (pHandle == NULL) { if (pHandle == NULL) {
...@@ -270,6 +265,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -270,6 +265,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch); consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
} }
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
// 2.reset offset if needed // 2.reset offset if needed
if (reqOffset.type > 0) { if (reqOffset.type > 0) {
fetchOffsetNew = reqOffset; fetchOffsetNew = reqOffset;
...@@ -279,7 +279,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -279,7 +279,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffsetNew = pOffset->val; fetchOffsetNew = pOffset->val;
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, &fetchOffsetNew); tFormatOffset(formatBuf, 80, &fetchOffsetNew);
tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %s", consumerId, pHandle->subKey, formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
...@@ -295,8 +295,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -295,8 +295,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal));
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s, reset none failed", consumerId, tqError("tmq poll: subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed",
TD_VID(pTq->pVnode), pReq->subKey); pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1; return -1;
} }
...@@ -307,14 +307,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -307,14 +307,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (!pHandle->fetchMeta && fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
fetchOffsetNew.version++;
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) { if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0); ASSERT(0);
code = -1; code = -1;
goto OVER; goto OVER;
} }
if (dataRsp.blockNum == 0) { if (dataRsp.blockNum == 0) {
// add to async task // TODO add to async task
/*dataRsp.rspOffset.version--;*/
} }
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
...@@ -322,7 +324,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -322,7 +324,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto OVER; goto OVER;
} }
if (pHandle->fetchMeta && fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
...@@ -334,8 +336,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -334,8 +336,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch); consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d", tqWarn(
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); "tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %ld, found new consumer epoch %d, discard req "
"epoch %d",
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break; break;
} }
......
...@@ -135,6 +135,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -135,6 +135,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
int32_t code = tqRetrieveDataBlock(&ret->data, pReader); int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
if (code != 0 || ret->data.info.rows == 0) { if (code != 0 || ret->data.info.rows == 0) {
ASSERT(0); ASSERT(0);
continue;
#if 0 #if 0
if (fromProcessedMsg) { if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
......
...@@ -150,6 +150,7 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { ...@@ -150,6 +150,7 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) { if (ver == pRead->curVersion) {
wDebug("wal version %ld match, no need to reset", ver);
return 0; return 0;
} }
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
...@@ -177,6 +178,8 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { ...@@ -177,6 +178,8 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
return -1; return -1;
} }
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
pRead->curVersion = ver; pRead->curVersion = ver;
return 0; return 0;
...@@ -249,6 +252,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -249,6 +252,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
} }
pRead->curVersion = ver + 1; pRead->curVersion = ver + 1;
wDebug("version advance to %ld, fetch body", pRead->curVersion);
return 0; return 0;
} }
...@@ -265,6 +269,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { ...@@ -265,6 +269,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
} }
pRead->curVersion++; pRead->curVersion++;
wDebug("version advance to %ld, skip fetch", pRead->curVersion);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册