From cf64d4c9c55a104b9b72a3b69d6e05f7c4175d1d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 12 Jul 2023 17:22:23 +0800 Subject: [PATCH] fix:set get_assignment offset to first version of response block --- include/common/tmsg.h | 8 + include/common/tmsgdef.h | 2 +- source/client/src/clientTmq.c | 125 ++++++++++---- source/client/test/clientTests.cpp | 20 ++- source/common/src/tmsg.c | 42 +++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 170 ++++++++++++-------- source/dnode/vnode/src/tq/tqUtil.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 7 +- 10 files changed, 271 insertions(+), 109 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2d75424bb5..0c58b470c2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3371,6 +3371,12 @@ typedef struct { int8_t reserved; } SMqHbRsp; +typedef struct { + SMsgHead head; + int64_t consumerId; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; +} SMqSeekReq; + #define TD_AUTO_CREATE_TABLE 0x1 typedef struct { int64_t suid; @@ -3500,6 +3506,8 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeatroySMqHbReq(SMqHbReq* pReq); +int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq); +int32_t tDeserializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq); #define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3fc94f4408..3f4335af94 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -306,7 +306,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK_TO_OFFSET, "vnode-tmq-seekto-offset", STqOffset, STqOffset) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK, "vnode-tmq-seek", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 50a42d547c..5879de2e30 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -140,6 +140,7 @@ enum { typedef struct SVgOffsetInfo { STqOffsetVal committedOffset; STqOffsetVal currentOffset; + STqOffsetVal seekOffset; // the first version in block for seek operation int64_t walVerBegin; int64_t walVerEnd; } SVgOffsetInfo; @@ -214,6 +215,11 @@ typedef struct SMqVgCommon { int32_t code; } SMqVgCommon; +typedef struct SMqSeekParam { + tsem_t sem; + int32_t code; +} SMqSeekParam; + typedef struct SMqVgWalInfoParam { int32_t vgId; int32_t epoch; @@ -821,7 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; - offRows->offset = pVg->offsetInfo.currentOffset; + offRows->offset = pVg->offsetInfo.seekOffset; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows); @@ -1479,6 +1485,7 @@ CREATE_MSG_FAIL: typedef struct SVgroupSaveInfo { STqOffsetVal currentOffset; STqOffsetVal commitOffset; + STqOffsetVal seekOffset; int64_t numOfRows; } SVgroupSaveInfo; @@ -1518,6 +1525,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew; clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; + clientVg.offsetInfo.seekOffset = pInfo ? pInfo->seekOffset : offsetNew; clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; @@ -1577,7 +1585,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .seekOffset = pVgCur->offsetInfo.seekOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } @@ -1879,10 +1887,11 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p return 0; } -static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){ +static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); - pVg->offsetInfo.currentOffset = *offset; + pVg->offsetInfo.seekOffset = *reqOffset; + pVg->offsetInfo.currentOffset = *rspOffset; } else { tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } @@ -1944,7 +1953,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); + updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); @@ -1994,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); + updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); @@ -2022,7 +2031,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); + updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); if (pollRspWrapper->taosxRsp.blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, @@ -2545,6 +2554,8 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_post(&pCommon->rsp); } + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pParam); return 0; } @@ -2615,7 +2626,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } tmq_topic_assignment* pAssignment = &(*assignment)[j]; - pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; + pAssignment->currentOffset = pClientVg->offsetInfo.seekOffset.version; pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; @@ -2654,6 +2665,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqPollReq req = {0}; tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg); + req.reqOffset = pClientVg->offsetInfo.seekOffset; int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); if (msgSize < 0) { @@ -2750,6 +2762,17 @@ void tmq_free_assignment(tmq_topic_assignment* pAssignment) { taosMemoryFree(pAssignment); } +static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { + if (pMsg) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + SMqSeekParam* pParam = param; + pParam->code = code; + tsem_post(&pParam->sem); + return 0; +} + int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL) { tscError("invalid tmq handle, null"); @@ -2803,35 +2826,71 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ // update the offset, and then commit to vnode pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; + pOffsetInfo->seekOffset = pOffsetInfo->currentOffset; // pOffsetInfo->committedOffset.version = INT64_MIN; pVg->seekUpdated = true; + tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); + + SMqSeekReq req = {0}; + snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, pTopic->topicName); + req.head.vgId = pVg->vgId; + req.consumerId = tmq->consumerId; + + int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req); + if (msgSize < 0) { + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_PAR_INTERNAL_ERROR; + } + + char* msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) { + taosMemoryFree(msg); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_PAR_INTERNAL_ERROR; + } + + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + taosMemoryFree(msg); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam)); + if (pParam == NULL) { + taosMemoryFree(msg); + taosMemoryFree(sendInfo); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_OUT_OF_MEMORY; + } + tsem_init(&pParam->sem, 0, 0); + + sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqSeekCb; + sendInfo->msgType = TDMT_VND_TMQ_SEEK; - tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); + int64_t transporterId = 0; + tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, + tmq->consumerId, pTopic->topicName, vgId, tmq->epoch); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); taosWUnLockLatch(&tmq->lock); -// SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; -// tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); -// -// SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); -// if (pInfo == NULL) { -// tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); -// return TSDB_CODE_OUT_OF_MEMORY; -// } -// -// tsem_init(&pInfo->sem, 0, 0); -// pInfo->code = 0; -// -// asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo); -// -// tsem_wait(&pInfo->sem); -// int32_t code = pInfo->code; -// -// tsem_destroy(&pInfo->sem); -// taosMemoryFree(pInfo); -// -// if (code != TSDB_CODE_SUCCESS) { -// tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code)); -// } + tsem_wait(&pParam->sem); + int32_t code = pParam->code; + tsem_destroy(&pParam->sem); + taosMemoryFree(pParam); - return 0; + if (code != TSDB_CODE_SUCCESS) { + tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, vgId, tstrerror(code)); + } + + return code; } \ No newline at end of file diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 3c46d17802..a2cda0dcf9 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -34,6 +34,8 @@ namespace { void printSubResults(void* pRes, int32_t* totalRows) { char buf[1024]; + int32_t vgId = tmq_get_vgroup_id(pRes); + int64_t offset = tmq_get_vgroup_offset(pRes); while (1) { TAOS_ROW row = taos_fetch_row(pRes); if (row == NULL) { @@ -45,7 +47,7 @@ void printSubResults(void* pRes, int32_t* totalRows) { int32_t precision = taos_result_precision(pRes); taos_print_row(buf, row, fields, numOfFields); *totalRows += 1; - printf("precision: %d, row content: %s\n", precision, buf); + printf("vgId: %d, offset: %"PRId64", precision: %d, row content: %s\n", vgId, offset, precision, buf); } // taos_free_result(pRes); @@ -1160,6 +1162,7 @@ TEST(clientCase, td_25129) { } while (1) { + printf("start to poll\n"); TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); if (pRes) { char buf[128]; @@ -1173,9 +1176,24 @@ TEST(clientCase, td_25129) { // printf("vgroup id: %d\n", vgroupId); printSubResults(pRes, &totalRows); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } } else { tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); + tmq_commit_sync(tmq, pRes); continue; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f6b3d0ca49..7175f1be74 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5382,6 +5382,48 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { return 0; } + +int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->subKey) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->head.vgId); + pHead->contLen = htonl(tlen + headLen); + } + + return tlen + headLen; +} + +int32_t tDeserializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; + tDecodeCStrTo(&decoder, pReq->subKey); + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 94b804290a..738b7db46a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -726,7 +726,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK_TO_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cbf0933358..b5a7e5fc6b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -228,7 +228,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); +int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 99f3c02f13..0b10b62267 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -330,86 +330,124 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t return 0; } -int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - SMqVgOffset vgOffset = {0}; +int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { + SMqSeekReq req = {0}; int32_t vgId = TD_VID(pTq->pVnode); + SRpcMsg rsp = {.info = pMsg->info}; + int code = 0; - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { - tqError("vgId:%d failed to decode seek msg", vgId); - return -1; - } - - tDecoderClear(&decoder); - - tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64, - vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); - - STqOffset* pOffset = &vgOffset.offset; - if (pOffset->val.type != TMQ_OFFSET__LOG) { - tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type); - return -1; + tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); + if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; } - STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { - tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); - terrno = TSDB_CODE_INVALID_MSG; - return -1; + tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); + code = 0; + goto end; } // 2. check consumer-vg assignment status taosRLockLatch(&pTq->lock); - if (pHandle->consumerId != vgOffset.consumerId) { - tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, - vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId); - terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + if (pHandle->consumerId != req.consumerId) { + tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + req.consumerId, vgId, req.subKey, pHandle->consumerId); taosRUnLockLatch(&pTq->lock); - return -1; - } - taosRUnLockLatch(&pTq->lock); - - // 3. check the offset info - STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); - if (pSavedOffset != NULL) { - if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { - tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey); - return 0; // no need to update the offset value - } - - if (pSavedOffset->val.version == pOffset->val.version) { - tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, - pOffset->val.version, pSavedOffset->val.version); - return 0; - } - } - - int64_t sver = 0, ever = 0; - walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - if (pOffset->val.version < sver) { - pOffset->val.version = sver; - } else if (pOffset->val.version > ever) { - pOffset->val.version = ever; + code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + goto end; } - // save the new offset value - if (pSavedOffset != NULL) { - tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, - pSavedOffset->val.version); - } else { - tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); - } - - if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { - tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version); - return -1; - } - - tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, - vgOffset.consumerId, vgOffset.offset.val.version); + //if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to TMQ_VG_STATUS__IDLE, + //otherwise poll data failed after seek. + tqUnregisterPushHandle(pTq, pHandle); + taosRUnLockLatch(&pTq->lock); +end: + rsp.code = code; + tmsgSendRsp(&rsp); return 0; + +// SMqVgOffset vgOffset = {0}; +// int32_t vgId = TD_VID(pTq->pVnode); +// +// SDecoder decoder; +// tDecoderInit(&decoder, (uint8_t*)msg, msgLen); +// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { +// tqError("vgId:%d failed to decode seek msg", vgId); +// return -1; +// } +// +// tDecoderClear(&decoder); +// +// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64, +// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); +// +// STqOffset* pOffset = &vgOffset.offset; +// if (pOffset->val.type != TMQ_OFFSET__LOG) { +// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type); +// return -1; +// } +// +// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); +// if (pHandle == NULL) { +// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); +// terrno = TSDB_CODE_INVALID_MSG; +// return -1; +// } +// +// // 2. check consumer-vg assignment status +// taosRLockLatch(&pTq->lock); +// if (pHandle->consumerId != vgOffset.consumerId) { +// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, +// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId); +// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; +// taosRUnLockLatch(&pTq->lock); +// return -1; +// } +// taosRUnLockLatch(&pTq->lock); +// +// // 3. check the offset info +// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); +// if (pSavedOffset != NULL) { +// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { +// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey); +// return 0; // no need to update the offset value +// } +// +// if (pSavedOffset->val.version == pOffset->val.version) { +// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, +// pOffset->val.version, pSavedOffset->val.version); +// return 0; +// } +// } +// +// int64_t sver = 0, ever = 0; +// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); +// if (pOffset->val.version < sver) { +// pOffset->val.version = sver; +// } else if (pOffset->val.version > ever) { +// pOffset->val.version = ever; +// } +// +// // save the new offset value +// if (pSavedOffset != NULL) { +// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, +// pSavedOffset->val.version); +// } else { +// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); +// } +// +// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { +// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version); +// return -1; +// } +// +// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, +// vgOffset.consumerId, vgOffset.offset.val.version); +// +// return 0; } int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8e9f043f62..8948bae852 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -214,7 +214,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); - taosxRsp.reqOffset.type = offset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset + taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 88bd540b85..0d9c478c1b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -466,11 +466,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } break; - case TDMT_VND_TMQ_SEEK_TO_OFFSET: - if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { - goto _err; - } - break; case TDMT_VND_TMQ_ADD_CHECKINFO: if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; @@ -643,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); + case TDMT_VND_TMQ_SEEK: + return tqProcessSeekReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: -- GitLab