From c654f1145d7a86688a3abcf98947ee96616ba8f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Apr 2023 10:59:16 +0800 Subject: [PATCH] enh(stream): add new msg for seek, and do some internal refactor. --- include/common/tmsg.h | 58 ++------ include/common/tmsgdef.h | 1 + include/libs/wal/wal.h | 3 +- source/client/src/clientRawBlockWrite.c | 8 +- source/client/src/clientTmq.c | 80 ++++++++--- source/client/test/clientTests.cpp | 64 +++++---- source/common/src/tmsg.c | 48 ++++++- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 11 +- source/dnode/vnode/src/inc/tq.h | 10 +- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 124 +++++++++-------- source/dnode/vnode/src/tq/tqPush.c | 6 +- source/dnode/vnode/src/tq/tqUtil.c | 142 +++++++++++++------- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 + source/libs/wal/src/walRead.c | 7 + 16 files changed, 355 insertions(+), 214 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb2450e8f7..9e2dbe2f7a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3101,6 +3101,8 @@ typedef struct { int32_t code; int32_t epoch; int64_t consumerId; + int64_t walsver; + int64_t walever; } SMqRspHead; typedef struct { @@ -3147,43 +3149,9 @@ typedef struct { SSchemaWrapper schema; } SMqSubTopicEp; -static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pTopicEp->topic); - tlen += taosEncodeString(buf, pTopicEp->db); - int32_t sz = taosArrayGetSize(pTopicEp->vgs); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); - tlen += tEncodeSMqSubVgEp(buf, pVgEp); - } - tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { - buf = taosDecodeStringTo(buf, pTopicEp->topic); - buf = taosDecodeStringTo(buf, pTopicEp->db); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); - if (pTopicEp->vgs == NULL) { - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqSubVgEp vgEp; - buf = tDecodeSMqSubVgEp(buf, &vgEp); - taosArrayPush(pTopicEp->vgs, &vgEp); - } - buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); - return buf; -} - -static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - taosMemoryFreeClear(pSubTopicEp->schema.pSchema); - pSubTopicEp->schema.nCols = 0; - taosArrayDestroy(pSubTopicEp->vgs); -} +int32_t tEncodeMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp); +void* tDecodeMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp); +void tDeleteMqSubTopicEp(SMqSubTopicEp* pSubTopicEp); typedef struct { SMqRspHead head; @@ -3193,8 +3161,8 @@ typedef struct { void* metaRsp; } SMqMetaRsp; -int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp); -int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp); +int32_t tEncodeMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp); +int32_t tDecodeMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp); typedef struct { SMqRspHead head; @@ -3209,9 +3177,9 @@ typedef struct { SArray* blockSchema; } SMqDataRsp; -int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); -int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); -void tDeleteSMqDataRsp(SMqDataRsp* pRsp); +int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); +int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); +void tDeleteMqDataRsp(SMqDataRsp* pRsp); typedef struct { SMqRspHead head; @@ -3247,7 +3215,7 @@ static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pR tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i); - tlen += tEncodeSMqSubTopicEp(buf, pVgEp); + tlen += tEncodeMqSubTopicEp(buf, pVgEp); } return tlen; } @@ -3262,14 +3230,14 @@ static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) { } for (int32_t i = 0; i < sz; i++) { SMqSubTopicEp topicEp; - buf = tDecodeSMqSubTopicEp(buf, &topicEp); + buf = tDecodeMqSubTopicEp(buf, &topicEp); taosArrayPush(pRsp->topics, &topicEp); } return buf; } static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { - taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteSMqSubTopicEp); + taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteMqSubTopicEp); } #define TD_AUTO_CREATE_TABLE 0x1 diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6cf6140815..c10e6415e1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -300,6 +300,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK_TO_OFFSET, "vnode-tmq-seekto-offset", STqOffset, STqOffset) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b51289de5e..1e3974f5cc 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -190,7 +190,7 @@ int32_t walApplyVer(SWal *, int64_t ver); // int32_t walDataCorrupted(SWal*); -// read +// wal reader SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond); void walCloseReader(SWalReader *pRead); void walReadReset(SWalReader *pReader); @@ -198,6 +198,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver); int32_t walReadSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader* pReader); +void walReaderValidVersionRange(SWalReader* pReader, int64_t *sver, int64_t *ever); // only for tq usage void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index a09780dc15..32f28e4563 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1511,7 +1511,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { rspObj.resType = RES_TYPE__TMQ; tDecoderInit(&decoder, data, dataLen); - code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp); + code = tDecodeMqDataRsp(&decoder, &rspObj.rsp); if (code != 0) { uError("WriteRaw:decode smqDataRsp error"); code = TSDB_CODE_INVALID_MSG; @@ -1615,7 +1615,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = pRequest->code; end: - tDeleteSMqDataRsp(&rspObj.rsp); + tDeleteMqDataRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1858,7 +1858,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { int32_t len = 0; int32_t code = 0; - tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code); + tEncodeSize(tEncodeMqDataRsp, &rspObj->rsp, len, code); if (code < 0) { return -1; } @@ -1866,7 +1866,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { void* buf = taosMemoryCalloc(1, len); SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); - tEncodeSMqDataRsp(&encoder, &rspObj->rsp); + tEncodeMqDataRsp(&encoder, &rspObj->rsp); tEncoderClear(&encoder); raw->raw = buf; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d24fae4211..693daee1fc 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -225,7 +225,7 @@ static int32_t doAskEp(tmq_t* tmq); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, - int32_t index, int32_t totalVgroups); + int32_t index, int32_t totalVgroups, int32_t type); static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param); static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param); @@ -473,7 +473,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { } static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet, - int32_t index, int32_t totalVgroups) { + int32_t index, int32_t totalVgroups, int32_t type) { STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); if (pOffset == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -539,7 +539,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN pMsgSendInfo->param = pParam; pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = tmqCommitCb; - pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; + pMsgSendInfo->msgType = type; atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1); @@ -575,7 +575,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return NULL; } -static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) { +static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) { char* pTopicName = NULL; int32_t vgId = 0; int32_t code = 0; @@ -645,7 +645,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) { - code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); + code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type); // failed to commit, callback user function directly. if (code != TSDB_CODE_SUCCESS) { @@ -686,7 +686,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) { - int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups); + int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET); if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno), @@ -1328,7 +1328,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (rspType == TMQ_MSG_TYPE__POLL_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); + tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp); tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -1339,7 +1339,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp); + tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp); tDecoderClear(&decoder); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) { @@ -1808,8 +1808,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // update the local offset value only for the returned values. pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; + + // update the status atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // update the valid wal version range + pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver; + pVg->offsetInfo.walVerEnd = pDataRsp->head.walever; + char buf[80]; tFormatOffset(buf, 80, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { @@ -1837,6 +1843,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + // todo handle the wal range and epset for each vgroup SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -2128,11 +2135,11 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* if (pRes == NULL) { // here needs to commit all offsets. asyncCommitAllOffsets(tmq, cb, param); } else { // only commit one offset - asyncCommitOffset(tmq, pRes, cb, param); + asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param); } } -static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) { +static void commitCallBackFn(tmq_t *UNUSED_PARAM(pTmq), int32_t code, void* param) { SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param; pInfo->code = code; tsem_post(&pInfo->sem); @@ -2148,7 +2155,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { if (pRes == NULL) { asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo); } else { - asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo); + asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo); } tsem_wait(&pInfo->sem); @@ -2339,13 +2346,25 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = 0; *assignment = NULL; - SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); + int32_t accId = tmq->pTscObj->acctId; + char tname[128] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { return TSDB_CODE_INVALID_PARA; } // in case of snapshot is opened, no valid offset will return *numOfAssignment = taosArrayGetSize(pTopic->vgs); + + *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment)); + if (*assignment == NULL) { + tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId, + (*numOfAssignment) * sizeof(tmq_topic_assignment)); + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); @@ -2370,9 +2389,13 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle return TSDB_CODE_INVALID_PARA; } - SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); + int32_t accId = tmq->pTscObj->acctId; + char tname[128] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { - tscError("consumer:0x:" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); return TSDB_CODE_INVALID_PARA; } @@ -2387,7 +2410,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle } if (pVg == NULL) { - tscError("consumer:0x:" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle); + tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle); return TSDB_CODE_INVALID_PARA; } @@ -2395,26 +2418,43 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle int32_t type = pOffsetInfo->currentOffset.type; if (type != TMQ_OFFSET__LOG) { - tscError("consumer:0x:" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); return TSDB_CODE_INVALID_PARA; } if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { - tscError("consumer:0x:" PRIx64 " invalid seek params, offset:%" PRId64, tmq->consumerId, offset); + tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64, tmq->consumerId, offset); return TSDB_CODE_INVALID_PARA; } // update the offset, and then commit to vnode if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { pOffsetInfo->currentOffset.version = offset; - pOffsetInfo->committedOffset.version = offset; + pOffsetInfo->committedOffset.version = INT64_MIN; } SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; - tstrncpy(rspObj.topic, pTopicName, tListLen(rspObj.topic)); + tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); - int32_t code = tmq_commit_sync(tmq, &rspObj); + + SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); + if (pInfo == NULL) { + tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); + return TSDB_CODE_OUT_OF_MEMORY; + } + + tsem_init(&pInfo->sem, 0, 0); + pInfo->code = 0; + + asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo); + + tsem_wait(&pInfo->sem); + int32_t code = pInfo->code; + + tsem_destroy(&pInfo->sem); + taosMemoryFree(pInfo); + if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code)); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 2674bc0c59..734577bd1d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -30,6 +30,27 @@ #include "taos.h" namespace { + +void printSubResults(void* pRes, int32_t* totalRows) { + char buf[1024]; + + while (1) { + TAOS_ROW row = taos_fetch_row(pRes); + if (row == NULL) { + break; + } + + TAOS_FIELD* fields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_field_count(pRes); + int32_t precision = taos_result_precision(pRes); + taos_print_row(buf, row, fields, numOfFields); + *totalRows += 1; + printf("precision: %d, row content: %s\n", precision, buf); + } + +// taos_free_result(pRes); +} + void showDB(TAOS* pConn) { TAOS_RES* pRes = taos_query(pConn, "show databases"); TAOS_ROW pRow = NULL; @@ -1059,13 +1080,13 @@ TEST(clientCase, sub_tb_test) { ASSERT_NE(pConn, nullptr); tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "group.id", "cgrpName45"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); - tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + tmq_conf_set(conf, "experimental.snapshot.enable", "false"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); @@ -1074,7 +1095,7 @@ TEST(clientCase, sub_tb_test) { // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); - tmq_list_append(topicList, "topic_t2"); + tmq_list_append(topicList, "topic_t1"); // 启动订阅 tmq_subscribe(tmq, topicList); @@ -1089,11 +1110,15 @@ TEST(clientCase, sub_tb_test) { int32_t count = 0; + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + + TAOS_RES* p = tmq_consumer_poll(tmq, timeout); + int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign); + while (1) { TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); - if (pRes) { - char buf[1024]; - + if (pRes != NULL) { const char* topicName = tmq_get_topic_name(pRes); const char* dbName = tmq_get_db_name(pRes); int32_t vgroupId = tmq_get_vgroup_id(pRes); @@ -1102,27 +1127,18 @@ TEST(clientCase, sub_tb_test) { printf("db: %s\n", dbName); printf("vgroup id: %d\n", vgroupId); - while (1) { - TAOS_ROW row = taos_fetch_row(pRes); - if (row == NULL) { - break; - } - - fields = taos_fetch_fields(pRes); - numOfFields = taos_field_count(pRes); - precision = taos_result_precision(pRes); - taos_print_row(buf, row, fields, numOfFields); - totalRows += 1; - printf("precision: %d, row content: %s\n", precision, buf); - } + printSubResults(pRes, &totalRows); + } else { +// tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin); +// break; + } + tmq_commit_sync(tmq, pRes); + if (pRes != NULL) { taos_free_result(pRes); - // if ((++count) > 1) { - // break; - // } - } else { - break; } + + tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin); } tmq_consumer_close(tmq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0a18941780..70c9ae4732 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6973,21 +6973,21 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { return 0; } -int32_t tEncodeSMqMetaRsp(SEncoder *pEncoder, const SMqMetaRsp *pRsp) { +int32_t tEncodeMqMetaRsp(SEncoder *pEncoder, const SMqMetaRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeI16(pEncoder, pRsp->resMsgType)) return -1; if (tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1; return 0; } -int32_t tDecodeSMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) { +int32_t tDecodeMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) { if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; if (tDecodeI16(pDecoder, &pRsp->resMsgType) < 0) return -1; if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t *)&pRsp->metaRspLen) < 0) return -1; return 0; } -int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { +int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1; @@ -7012,7 +7012,7 @@ int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { return 0; } -int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { +int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1; if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1; @@ -7057,7 +7057,7 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } -void tDeleteSMqDataRsp(SMqDataRsp *pRsp) { +void tDeleteMqDataRsp(SMqDataRsp *pRsp) { pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); pRsp->blockData = NULL; @@ -7539,3 +7539,41 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { } } } + +int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pTopicEp->topic); + tlen += taosEncodeString(buf, pTopicEp->db); + int32_t sz = taosArrayGetSize(pTopicEp->vgs); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp *pVgEp = (SMqSubVgEp *)taosArrayGet(pTopicEp->vgs, i); + tlen += tEncodeSMqSubVgEp(buf, pVgEp); + } + tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); + return tlen; +} + +void *tDecodeMqSubTopicEp(void *buf, SMqSubTopicEp *pTopicEp) { + buf = taosDecodeStringTo(buf, pTopicEp->topic); + buf = taosDecodeStringTo(buf, pTopicEp->db); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); + if (pTopicEp->vgs == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp vgEp; + buf = tDecodeSMqSubVgEp(buf, &vgEp); + taosArrayPush(pTopicEp->vgs, &vgEp); + } + buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); + return buf; +} + +void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) { + taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + pSubTopicEp->schema.nCols = 0; + taosArrayDestroy(pSubTopicEp->vgs); +} \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d61eb3ec03..7323d23486 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -517,6 +517,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK_TO_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 1b146506a2..0d75b5fd68 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -556,9 +556,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { return -1; } - ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; - ((SMqRspHead *)buf)->epoch = serverEpoch; - ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; + SMqRspHead* pHead = buf; + + pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP; + pHead->epoch = serverEpoch; + pHead->consumerId = pConsumer->consumerId; + pHead->walsver = 0; + pHead->walever = 0; + void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqAskEpRsp(&abuf, &rsp); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index db17e4f533..5a6f5249b3 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -106,6 +106,7 @@ typedef struct { SMqDataRsp* pDataRsp; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; SRpcHandleInfo info; + STqHandle* pHandle; } STqPushEntry; struct STQ { @@ -145,8 +146,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); -int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, + int32_t type, int32_t vgId); +int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId); // tqMeta int32_t tqMetaOpen(STQ* pTq); @@ -182,13 +184,13 @@ int32_t tqStreamTasksScanWal(STQ* pTq); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); -int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); +int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, + int32_t type, int64_t sver, int64_t ever); void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver); void saveOffsetForAllTasks(STQ* pTq, int64_t ver); void initOffsetForAllRestoreTasks(STQ* pTq); -int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81f7c3d52a..764e57eb09 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -206,6 +206,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7c7f59b6b7..5e4ede914a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -76,7 +76,7 @@ static void destroyTqHandle(void* data) { static void tqPushEntryFree(void* data) { STqPushEntry* p = *(void**)data; if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - tDeleteSMqDataRsp(p->pDataRsp); + tDeleteMqDataRsp(p->pDataRsp); } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) { tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp); } @@ -154,71 +154,30 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } -static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, - int64_t consumerId, int32_t type) { - int32_t len = 0; - int32_t code = 0; - - if (type == TMQ_MSG_TYPE__POLL_RSP) { - tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); - } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); - } - - if (code < 0) { - return -1; - } - - int32_t tlen = sizeof(SMqRspHead) + len; - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = type; - ((SMqRspHead*)buf)->epoch = epoch; - ((SMqRspHead*)buf)->consumerId = consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - - SEncoder encoder = {0}; - tEncoderInit(&encoder, abuf, len); - - if (type == TMQ_MSG_TYPE__POLL_RSP) { - tEncodeSMqDataRsp(&encoder, pRsp); - } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); - } - - tEncoderClear(&encoder); - - SRpcMsg rsp = { - .info = *pRpcHandleInfo, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - - tmsgSendRsp(&rsp); - return 0; -} - -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { +int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId) { SMqDataRsp* pRsp = pPushEntry->pDataRsp; SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; - doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); + + int64_t sver = 0, ever = 0; + walReaderValidVersionRange(pPushEntry->pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + + tqDoSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType, sver, ever); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + vgId, pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); return 0; } -int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) { - doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type); +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, + int32_t type, int32_t vgId) { + int64_t sver = 0, ever = 0; + walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + + tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever); char buf1[80] = {0}; char buf2[80] = {0}; @@ -226,7 +185,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, - TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); + vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); return 0; } @@ -259,6 +218,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) { + tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64, + vgId, offset.subKey, offset.val.version, pSavedOffset->val.version); return 0; // no need to update the offset value } @@ -277,6 +238,57 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t return 0; } +int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + STqOffset offset = {0}; + int32_t vgId = TD_VID(pTq->pVnode); + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + return -1; + } + + tDecoderClear(&decoder); + + if (offset.val.type != TMQ_OFFSET__LOG) { + tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, offset.subKey, offset.val.type); + return -1; + } + + STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); + if (pSavedOffset != NULL && pSavedOffset->val.type != TMQ_OFFSET__LOG) { + tqError("invalid saved offset type, vgId:%d sub:%s", vgId, offset.subKey); + return 0; // no need to update the offset value + } + + if (pSavedOffset->val.version == offset.val.version) { + tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, offset.subKey, offset.val.version, + pSavedOffset->val.version); + return 0; + } + + STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); + + int64_t sver = 0, ever = 0; + walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + if (offset.val.version < sver) { + offset.val.version = sver; + } else if (offset.val.version > ever) { + offset.val.version = ever; + } + + // save the new offset value + tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, offset.subKey, offset.val.version, + pSavedOffset->val.version); + + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { + tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, offset.subKey, offset.val.version); + return -1; + } + + return 0; +} + int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { void* pIter = NULL; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 7a1a6b7454..43463f67b7 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -263,7 +263,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6 if (pRsp->blockNum > 0) { tqOffsetResetToLog(&pRsp->rspOffset, ver); - tqPushDataRsp(pTq, pPushEntry); + tqPushDataRsp(pPushEntry, vgId); recordPushedEntry(pCachedKey, pIter); } } @@ -376,6 +376,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest return -1; } + pPushEntry->pHandle = pHandle; pPushEntry->info = pRpcMsg->info; memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -388,6 +389,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest } SMqRspHead* pHead = &pPushEntry->pDataRsp->head; + pHead->consumerId = consumerId; pHead->epoch = pRequest->epoch; pHead->mqMsgType = type; @@ -411,7 +413,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint6 (*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1); if (rspConsumer) { // rsp the old consumer with empty block. - tqPushDataRsp(pTq, *pEntry); + tqPushDataRsp(*pEntry, vgId); } taosHashRemove(pTq->pPushMgr, pKey, keyLen); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5ac747947f..b34adcec56 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -17,7 +17,7 @@ #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) -static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); +static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { char buf[128] = {0}; @@ -219,8 +219,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, pHandle->subKey, vgId, dataRsp.rspOffset.version); - int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - tDeleteSMqDataRsp(&dataRsp); + int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); + tDeleteMqDataRsp(&dataRsp); *pBlockReturned = true; return code; @@ -228,7 +228,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); *pBlockReturned = true; @@ -247,6 +247,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { + char buf[80] = {0}; uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); @@ -257,37 +258,32 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWLockLatch(&pTq->lock); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); - if(code != 0) { - goto end; - } - // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - taosWUnLockLatch(&pTq->lock); - return code; - } + int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + if (code == 0) { + // till now, all data has been transferred to consumer, new data needs to push client once arrived. + if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && + dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { + code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + taosWUnLockLatch(&pTq->lock); + return code; + } - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); + // NOTE: this pHandle->consumerId may have been changed already. + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); + } - // NOTE: this pHandle->consumerId may have been changed already. + tFormatOffset(buf, 80, &dataRsp.rspOffset); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 + " code:%d", + consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); + taosWUnLockLatch(&pTq->lock); + tDeleteMqDataRsp(&dataRsp); - end: - { - char buf[80] = {0}; - tFormatOffset(buf, 80, &dataRsp.rspOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", - consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); - } return code; } - static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { int code = 0; int32_t vgId = TD_VID(pTq->pVnode); @@ -303,7 +299,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (metaRsp.metaRspLen > 0) { - code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); @@ -314,7 +310,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 ",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); return code; }else { @@ -322,7 +318,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } } - if (offset->type == TMQ_OFFSET__LOG) { int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); @@ -337,13 +332,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int32_t savedEpoch = atomic_load_32(&pHandle->epoch); if (savedEpoch > pRequest->epoch) { tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); + ", found new consumer epoch %d, discard req epoch %d", + pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); break; } if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; @@ -357,7 +353,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if(totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; @@ -368,7 +364,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { + if (tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId) < 0) { code = -1; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); @@ -398,7 +394,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; @@ -446,10 +442,19 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } -int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { +static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver, + int64_t ever) { + pMsgHead->consumerId = consumerId; + pMsgHead->epoch = epoch; + pMsgHead->mqMsgType = type; + pMsgHead->walsver = sver; + pMsgHead->walever = ever; +} + +int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId) { int32_t len = 0; int32_t code = 0; - tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code); + tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code); if (code < 0) { return -1; } @@ -459,27 +464,64 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, return -1; } - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP; - ((SMqRspHead*)buf)->epoch = pReq->epoch; - ((SMqRspHead*)buf)->consumerId = pReq->consumerId; + int64_t sver = 0, ever = 0; + walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); SEncoder encoder = {0}; tEncoderInit(&encoder, abuf, len); - tEncodeSMqMetaRsp(&encoder, pRsp); + tEncodeMqMetaRsp(&encoder, pRsp); tEncoderClear(&encoder); - SRpcMsg resp = { - .info = pMsg->info, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - tmsgSendRsp(&resp); + SRpcMsg resp = { .info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0 }; - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", - TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type); + tmsgSendRsp(&resp); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId, + pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type); return 0; } + +int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, + int32_t type, int64_t sver, int64_t ever) { + int32_t len = 0; + int32_t code = 0; + + if (type == TMQ_MSG_TYPE__POLL_RSP) { + tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); + } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { + tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); + } + + if (code < 0) { + return -1; + } + + int32_t tlen = sizeof(SMqRspHead) + len; + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + SMqRspHead* pHead = (SMqRspHead*)buf; + initMqRspHead(pHead, type, epoch, consumerId, sver, ever); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + + if (type == TMQ_MSG_TYPE__POLL_RSP) { + tEncodeMqDataRsp(&encoder, pRsp); + } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { + tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); + } + + tEncoderClear(&encoder); + SRpcMsg rsp = { .info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0 }; + + tmsgSendRsp(&rsp); + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b62bf27def..d6e8e19043 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -389,6 +389,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp goto _err; } break; + case TDMT_VND_TMQ_SEEK_TO_OFFSET: + if (tqProcessSeekReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { + goto _err; + } + break; case TDMT_VND_TMQ_ADD_CHECKINFO: if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) { goto _err; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index dc3ff3e6de..19694a6126 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -102,6 +102,13 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } +void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) { + *sver = walGetFirstVer(pReader->pWal); + int64_t lastVer = walGetLastVer(pReader->pWal); + int64_t committedVer = walGetCommittedVer(pReader->pWal); + *ever = pReader->cond.scanUncommited ? lastVer : committedVer; +} + static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; -- GitLab