diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1282b0a94de5cc7b7775693049367bac59f7d1d5..8279ee7aeb17a69ea4ec3d21eca0eca2e6735783 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -531,159 +531,129 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p return 0; } } else { // use the consumer specified offset + // the offset value can not be monotonious increase?? offset = reqOffset; } // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); -// SMqDataRsp dataRsp = {0}; -// tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); -// -// // lock -// taosWLockLatch(&pTq->lock); -// -// qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); -// code = tqScanData(pTq, pHandle, &dataRsp, &offset); -// -// // till now, all data has been transferred to consumer, new data needs to push client once arrived. -// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && -// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { -// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); -// taosWUnLockLatch(&pTq->lock); -// return code; -// } -// -// taosWUnLockLatch(&pTq->lock); -// code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); -// -// // NOTE: this pHandle->consumerId may have been changed already. -// tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 -// ", ts:%" PRId64", reqId:0x%"PRIx64, -// consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, -// dataRsp.rspOffset.ts, pRequest->reqId); -// -// tDeleteSMqDataRsp(&dataRsp); -// return code; - } - - // todo handle the case where re-balance occurs. - // for taosx - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pRequest); - - if (offset.type != TMQ_OFFSET__LOG) { - if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { - return -1; - } - - if (metaRsp.metaRspLen > 0) { - code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",ts:%" PRId64, - consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, - metaRsp.rspOffset.ts); - taosMemoryFree(metaRsp.metaRsp); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - - if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } else { - offset = taosxRsp.rspOffset; - } - - tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",version:%" PRId64, - consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, - taosxRsp.rspOffset.version); - } else { - -// if (offset.type == TMQ_OFFSET__LOG) { - int64_t fetchVer = offset.version + 1; - pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); - if (pCkHead == NULL) { - tDeleteSTaosxRsp(&taosxRsp); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - walSetReaderCapacity(pHandle->pWalReader, 2048); + } else { // for taosX + // todo handle the case where re-balance occurs. + SMqMetaRsp metaRsp = {0}; + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pRequest); + + if (offset.type != TMQ_OFFSET__LOG) { + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { + return -1; + } - while (1) { - // todo refactor: this is not correct. - int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); - break; + if (metaRsp.metaRspLen > 0) { + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 + ",ts:%" PRId64, + consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); + taosMemoryFree(metaRsp.metaRsp); + tDeleteSTaosxRsp(&taosxRsp); + return code; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + if (taosxRsp.blockNum > 0) { code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); return code; + } else { + offset = taosxRsp.rspOffset; } - SWalCont* pHead = &pCkHead->head; - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, - pRequest->epoch, vgId, fetchVer, pHead->msgType); + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 + ",version:%" PRId64, + consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, + taosxRsp.rspOffset.version); + } else { + // if (offset.type == TMQ_OFFSET__LOG) { + int64_t fetchVer = offset.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + if (pCkHead == NULL) { + tDeleteSTaosxRsp(&taosxRsp); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - if (pHead->msgType == TDMT_VND_SUBMIT) { - SPackedData submit = { - .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), - .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), - .ver = pHead->version, - }; + walSetReaderCapacity(pHandle->pWalReader, 2048); - if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, - pRequest->subKey); - return -1; + while (1) { + // todo refactor: this is not correct. + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > pRequest->epoch) { + tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 + ", found new consumer epoch %d, discard req epoch %d", + consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); + break; } - if (taosxRsp.blockNum > 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; - } else { - fetchVer++; } - } else { - /*A(pHandle->fetchMeta);*/ - /*A(IS_META_MSG(pHead->msgType));*/ - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { - code = -1; + SWalCont* pHead = &pCkHead->head; + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", + consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); + + if (pHead->msgType == TDMT_VND_SUBMIT) { + SPackedData submit = { + .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), + .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), + .ver = pHead->version, + }; + + if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, + pRequest->subKey); + return -1; + } + + if (taosxRsp.blockNum > 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; + } else { + fetchVer++; + } + + } else { + /*A(pHandle->fetchMeta);*/ + /*A(IS_META_MSG(pHead->msgType));*/ + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { + code = -1; + taosMemoryFreeClear(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } + code = 0; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); return code; } - code = 0; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; } } - } - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return 0; + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return 0; + } } int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {