提交 69ca2b2f 编写于 作者: H Haojun Liao

fix(tmq): do some internal refactor.

上级 c3acab20
...@@ -531,159 +531,129 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p ...@@ -531,159 +531,129 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p
return 0; return 0;
} }
} else { // use the consumer specified offset } else { // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset; offset = reqOffset;
} }
// this is a normal subscribe requirement // this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
// SMqDataRsp dataRsp = {0}; } else { // for taosX
// tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); // todo handle the case where re-balance occurs.
// SMqMetaRsp metaRsp = {0};
// // lock STaosxRsp taosxRsp = {0};
// taosWLockLatch(&pTq->lock); tqInitTaosxRsp(&taosxRsp, pRequest);
//
// qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); if (offset.type != TMQ_OFFSET__LOG) {
// code = tqScanData(pTq, pHandle, &dataRsp, &offset); if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
// return -1;
// // till now, all data has been transferred to consumer, new data needs to push client once arrived. }
// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// taosWUnLockLatch(&pTq->lock);
// return code;
// }
//
// taosWUnLockLatch(&pTq->lock);
// code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
//
// // NOTE: this pHandle->consumerId may have been changed already.
// tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
// ", ts:%" PRId64", reqId:0x%"PRIx64,
// consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
// dataRsp.rspOffset.ts, pRequest->reqId);
//
// tDeleteSMqDataRsp(&dataRsp);
// return code;
}
// todo handle the case where re-balance occurs.
// for taosx
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
if (offset.type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
return -1;
}
if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
",ts:%" PRId64,
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
} else {
offset = taosxRsp.rspOffset;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64,
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version);
} else {
// if (offset.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) { if (metaRsp.metaRspLen > 0) {
// todo refactor: this is not correct. code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
if (savedEpoch > pRequest->epoch) { ",ts:%" PRId64,
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
", found new consumer epoch %d, discard req epoch %d", taosMemoryFree(metaRsp.metaRsp);
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); tDeleteSTaosxRsp(&taosxRsp);
break; return code;
} }
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (taosxRsp.blockNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code; return code;
} else {
offset = taosxRsp.rspOffset;
} }
SWalCont* pHead = &pCkHead->head; tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, ",version:%" PRId64,
pRequest->epoch, vgId, fetchVer, pHead->msgType); consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version);
} else {
// if (offset.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (pHead->msgType == TDMT_VND_SUBMIT) { walSetReaderCapacity(pHandle->pWalReader, 2048);
SPackedData submit = {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { while (1) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, // todo refactor: this is not correct.
pRequest->subKey); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
return -1; if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break;
} }
if (taosxRsp.blockNum > 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead); taosMemoryFreeClear(pCkHead);
return code; return code;
} else {
fetchVer++;
} }
} else { SWalCont* pHead = &pCkHead->head;
/*A(pHandle->fetchMeta);*/ tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
/*A(IS_META_MSG(pHead->msgType));*/ consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); if (pHead->msgType == TDMT_VND_SUBMIT) {
metaRsp.resMsgType = pHead->msgType; SPackedData submit = {
metaRsp.metaRspLen = pHead->bodyLen; .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
metaRsp.metaRsp = pHead->body; .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { .ver = pHead->version,
code = -1; };
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
pRequest->subKey);
return -1;
}
if (taosxRsp.blockNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
} else {
fetchVer++;
}
} else {
/*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
code = -1;
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
code = 0;
taosMemoryFreeClear(pCkHead); taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
code = 0;
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code;
} }
} }
}
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead); taosMemoryFreeClear(pCkHead);
return 0; return 0;
}
} }
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册