From 5cb35f2fa6389e8003287d35a0a02421da1c6ebc Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 19 Jul 2023 17:29:39 +0800 Subject: [PATCH] feat:add committed & position & commite_offset interface --- include/client/taos.h | 2 +- include/common/tmsgdef.h | 1 + include/util/taoserror.h | 1 + source/client/src/clientRawBlockWrite.c | 7 + source/client/src/clientTmq.c | 468 +++++++++++++------- source/client/test/clientTests.cpp | 83 ++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 43 ++ source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- source/util/src/terror.c | 1 + 11 files changed, 461 insertions(+), 151 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 3cc2d907ab..5ea1510e44 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -288,7 +288,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); -DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); +DLL_EXPORT int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3f4335af94..aa23442291 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -312,6 +312,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME_PUSH, "vnode-tmq-consume-push", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_COMMITTEDINFO, "vnode-tmq-committed-walinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index d6f44f4489..f37402c18c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -779,6 +779,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008) #define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009) #define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010) +#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 90b10e0920..dd311db126 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1327,6 +1327,9 @@ end: int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields, int numFields) { + if (!taos || !pData || !tbname) { + return TSDB_CODE_INVALID_PARA; + } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; SQuery* pQuery = NULL; @@ -1413,6 +1416,9 @@ end: } int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) { + if (!taos || !pData || !tbname) { + return TSDB_CODE_INVALID_PARA; + } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; SQuery* pQuery = NULL; @@ -1812,6 +1818,7 @@ end: } char* tmq_get_json_meta(TAOS_RES* res) { + if (res == NULL) return NULL; uDebug("tmq_get_json_meta called"); if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { return NULL; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 071b5d6b0f..f2ea7309e4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -220,6 +220,12 @@ typedef struct SMqSeekParam { int32_t code; } SMqSeekParam; +typedef struct SMqCommittedParam { + tsem_t sem; + int32_t code; + SMqVgOffset vgOffset; +} SMqCommittedParam; + typedef struct SMqVgWalInfoParam { int32_t vgId; int32_t epoch; @@ -241,7 +247,7 @@ typedef struct { typedef struct { SMqCommitCbParamSet* params; - SMqVgOffset* pOffset; +// SMqVgOffset* pOffset; char topicName[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; tmq_t* pTmq; @@ -292,6 +298,9 @@ void tmq_conf_destroy(tmq_conf_t* conf) { } tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { + if (conf == NULL || key == NULL || value == NULL){ + return TMQ_CONF_INVALID; + } if (strcasecmp(key, "group.id") == 0) { tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN); return TMQ_CONF_OK; @@ -406,6 +415,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); } int32_t tmq_list_append(tmq_list_t* list, const char* src) { + if(list == NULL) return -1; SArray* container = &list->container; if (src == NULL || src[0] == 0) return -1; char* topic = taosStrdup(src); @@ -414,16 +424,19 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { } void tmq_list_destroy(tmq_list_t* list) { + if(list == NULL) return; SArray* container = &list->container; taosArrayDestroyP(container, taosMemoryFree); } int32_t tmq_list_get_size(const tmq_list_t* list) { + if(list == NULL) return -1; const SArray* container = &list->container; return taosArrayGetSize(container); } char** tmq_list_to_c_array(const tmq_list_t* list) { + if(list == NULL) return NULL; const SArray* container = &list->container; return container->pData; } @@ -432,7 +445,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; - taosMemoryFree(pParam->pOffset); +// taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -441,30 +454,25 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { } static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet) { - SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset)); - if (pOffset == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + SMqVgOffset pOffset = {0}; - pOffset->consumerId = tmq->consumerId; - pOffset->offset.val = *offset; + pOffset.consumerId = tmq->consumerId; + pOffset.offset.val = *offset; int32_t groupLen = strlen(tmq->groupId); - memcpy(pOffset->offset.subKey, tmq->groupId, groupLen); - pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR; - strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName); + memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); + pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pOffset.offset.subKey + groupLen + 1, pTopicName); int32_t len = 0; int32_t code = 0; - tEncodeSize(tEncodeMqVgOffset, pOffset, len, code); + tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code); if (code < 0) { - taosMemoryFree(pOffset); return TSDB_CODE_INVALID_PARA; } void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); if (buf == NULL) { - taosMemoryFree(pOffset); return TSDB_CODE_OUT_OF_MEMORY; } @@ -474,19 +482,18 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse SEncoder encoder; tEncoderInit(&encoder, abuf, len); - tEncodeMqVgOffset(&encoder, pOffset); + tEncodeMqVgOffset(&encoder, &pOffset); tEncoderClear(&encoder); // build param SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); if (pParam == NULL) { - taosMemoryFree(pOffset); taosMemoryFree(buf); return TSDB_CODE_OUT_OF_MEMORY; } pParam->params = pParamSet; - pParam->pOffset = pOffset; +// pParam->pOffset = pOffset; pParam->vgId = vgId; pParam->pTmq = tmq; @@ -495,7 +502,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse // build send info SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { - taosMemoryFree(pOffset); taosMemoryFree(buf); taosMemoryFree(pParam); return TSDB_CODE_OUT_OF_MEMORY; @@ -553,40 +559,34 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p return pParamSet; } -static SMqClientVg* getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId){ + + +static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){ SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); if (pTopic == NULL) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s", tmq->consumerId, pTopicName); - - return NULL; + tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + return TSDB_CODE_TMQ_INVALID_TOPIC; } - int32_t j = 0; - int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - for (j = 0; j < numOfVgroups; j++) { - SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j); - if (pVg->vgId == vgId) { + int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); + for (int32_t i = 0; i < numOfVgs; ++i) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); + if (pClientVg->vgId == vgId) { + *pVg = pClientVg; break; } } - if (j == numOfVgroups) { - tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, - vgId, numOfVgroups, pTopicName); - return NULL; - } - - SMqClientVg* pVg = (SMqClientVg*)taosArrayGet(pTopic->vgs, j); - return pVg; + return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS; } static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0; tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); taosRLockLatch(&tmq->lock); - SMqClientVg* pVg = getClientVg(tmq, pTopicName, vgId); - if(pVg == NULL){ - code = TSDB_CODE_TMQ_INVALID_VGID; + SMqClientVg* pVg = NULL; + code = getClientVg(tmq, pTopicName, vgId, &pVg); + if(code != 0){ goto end; } if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) { @@ -601,7 +601,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, &pVg->offsetInfo.endOffset, pTopicName, pParamSet); + code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); @@ -964,6 +964,7 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { } int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; if (*topics == NULL) { *topics = tmq_list_new(); } @@ -977,6 +978,7 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } int32_t tmq_unsubscribe(tmq_t* tmq) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); if (rsp != 0) { @@ -1047,6 +1049,7 @@ static void tmqMgmtInit(void) { } tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { + if(conf == NULL) return NULL; taosThreadOnce(&tmqInit, tmqMgmtInit); if (tmqInitRes != 0) { terrno = tmqInitRes; @@ -1140,6 +1143,7 @@ _failed: } int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); @@ -1264,6 +1268,7 @@ FAIL: } void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { + if(conf == NULL) return; conf->commitCb = cb; conf->commitCbUserParam = param; } @@ -2050,6 +2055,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { + if(tmq == NULL) return NULL; + void* rspObj; int64_t startTime = taosGetTimestampMs(); @@ -2129,6 +2136,8 @@ static void displayConsumeStatistics(tmq_t* pTmq) { } int32_t tmq_consumer_close(tmq_t* tmq) { + if(tmq == NULL) return TSDB_CODE_INVALID_PARA; + tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); displayConsumeStatistics(tmq); @@ -2174,6 +2183,9 @@ const char* tmq_err2str(int32_t err) { } tmq_res_t tmq_get_res_type(TAOS_RES* res) { + if (res == NULL){ + return TMQ_RES_INVALID; + } if (TD_RES_TMQ(res)) { return TMQ_RES_DATA; } else if (TD_RES_TMQ_META(res)) { @@ -2186,6 +2198,9 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) { } const char* tmq_get_topic_name(TAOS_RES* res) { + if (res == NULL){ + return NULL; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return strchr(pRspObj->topic, '.') + 1; @@ -2201,6 +2216,10 @@ const char* tmq_get_topic_name(TAOS_RES* res) { } const char* tmq_get_db_name(TAOS_RES* res) { + if (res == NULL){ + return NULL; + } + if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return strchr(pRspObj->db, '.') + 1; @@ -2216,6 +2235,9 @@ const char* tmq_get_db_name(TAOS_RES* res) { } int32_t tmq_get_vgroup_id(TAOS_RES* res) { + if (res == NULL){ + return -1; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; return pRspObj->vgId; @@ -2231,6 +2253,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { } int64_t tmq_get_vgroup_offset(TAOS_RES* res) { + if (res == NULL){ + return TSDB_CODE_INVALID_PARA; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*) res; STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset; @@ -2254,10 +2279,13 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { } // data from tsdb, no valid offset info - return -1; + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } const char* tmq_get_table_name(TAOS_RES* res) { + if (res == NULL){ + return NULL; + } if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || @@ -2277,6 +2305,10 @@ const char* tmq_get_table_name(TAOS_RES* res) { } void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) { + if (tmq == NULL) { + tscError("invalid tmq handle, null"); + return; + } if (pRes == NULL) { // here needs to commit all offsets. asyncCommitAllOffsets(tmq, cb, param); } else { // only commit one offset @@ -2291,6 +2323,11 @@ static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param } int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { + if (tmq == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + int32_t code = 0; SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); @@ -2314,11 +2351,18 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { } // wal range will be ok after calling tmq_get_topic_assignment or poll interface -static bool isWalRangeOk(SVgOffsetInfo* offset){ - if (offset->walVerBegin != -1 && offset->walVerEnd != -1) { - return true; +static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value){ + if (offset->walVerBegin == -1 || offset->walVerEnd == -1) { + tscError("Assignment or poll interface need to be called first"); + return TSDB_CODE_TMQ_NEED_INITIALIZED; } - return false; + + if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) { + tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, offset->walVerBegin, offset->walVerEnd); + return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + } + + return 0; } int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset){ @@ -2332,41 +2376,18 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); - SMqClientTopic* pTopic = getTopicByName(tmq, tname); - if (pTopic == NULL) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_TOPIC; - } - SMqClientVg* pVg = NULL; - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for (int32_t i = 0; i < numOfVgs; ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgId) { - pVg = pClientVg; - break; - } - } - - if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_VGID; + return code; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; - if (!isWalRangeOk(pOffsetInfo)) { - tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_NEED_INITIALIZED; - } - - if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { - tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", - tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); + code = checkWalRange(pOffsetInfo, offset); + if (code != 0) { taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + return code; } taosWUnLockLatch(&tmq->lock); @@ -2384,7 +2405,7 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); tsem_wait(&pInfo->sem); - int32_t code = pInfo->code; + code = pInfo->code; tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); @@ -2394,6 +2415,41 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, return code; } +int32_t tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param){ + if (tmq == NULL || pTopicName == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = NULL; + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ + taosWUnLockLatch(&tmq->lock); + return code; + } + + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + code = checkWalRange(pOffsetInfo, offset); + if (code != 0) { + taosWUnLockLatch(&tmq->lock); + return code; + } + taosWUnLockLatch(&tmq->lock); + + STqOffsetVal offsetVal = {.type = TMQ_OFFSET__LOG, .version = offset}; + + code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param); + + tscInfo("consumer:0x%" PRIx64 " send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); + + return code; +} + void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { SAskEpInfo* pInfo = param; pInfo->code = code; @@ -2525,7 +2581,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } // if no more waiting rsp - pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); + if(pParamSet->callbackFn != NULL){ + pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); + } + taosMemoryFree(pParamSet); // tmq->needReportOffsetRows = true; @@ -2622,8 +2681,104 @@ static bool isInSnapshotMode(int8_t type, bool useSnapshot){ return false; } +static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) { + SMqCommittedParam* pParam = param; + + if (code != 0){ + goto end; + } + if (pMsg) { + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pMsg->pData, pMsg->len); + if (tDecodeMqVgOffset(&decoder, &pParam->vgOffset) < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + tDecoderClear(&decoder); + } + + end: + if(pMsg){ + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + pParam->code = code; + tsem_post(&pParam->sem); + return 0; +} + +int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* epSet){ + int32_t code = 0; + SMqVgOffset pOffset = {0}; + + pOffset.consumerId = tmq->consumerId; + + int32_t groupLen = strlen(tmq->groupId); + memcpy(pOffset.offset.subKey, tmq->groupId, groupLen); + pOffset.offset.subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pOffset.offset.subKey + groupLen + 1, tname); + + int32_t len = 0; + tEncodeSize(tEncodeMqVgOffset, &pOffset, len, code); + if (code < 0) { + return TSDB_CODE_INVALID_PARA; + } + + void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeMqVgOffset(&encoder, &pOffset); + tEncoderClear(&encoder); + + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + taosMemoryFree(buf); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SMqCommittedParam* pParam = taosMemoryMalloc(sizeof(SMqCommittedParam)); + if (pParam == NULL) { + taosMemoryFree(buf); + taosMemoryFree(sendInfo); + return TSDB_CODE_OUT_OF_MEMORY; + } + tsem_init(&pParam->sem, 0, 0); + + sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL}; + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmCommittedCb; + sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + + tsem_wait(&pParam->sem); + code = pParam->code; + if(code == TSDB_CODE_SUCCESS){ + if(pParam->vgOffset.offset.val.type == TMQ_OFFSET__LOG){ + code = pParam->vgOffset.offset.val.version; + }else{ + code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + } + tsem_destroy(&pParam->sem); + taosMemoryFree(pParam); + + return code; +} + int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ - if (tmq == NULL) { + if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -2633,60 +2788,103 @@ int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId){ sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); - SMqClientTopic* pTopic = getTopicByName(tmq, tname); - if (pTopic == NULL) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_TOPIC; - } SMqClientVg* pVg = NULL; - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for (int32_t i = 0; i < numOfVgs; ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgId) { - pVg = pClientVg; - break; - } - } - - if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_VGID; + return code; } - int32_t type = pVg->offsetInfo.endOffset.type; + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + int32_t type = pOffsetInfo->endOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - if (!isWalRangeOk(&pVg->offsetInfo)) { - tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); + code = checkWalRange(pOffsetInfo, -1); + if (code != 0) { taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_NEED_INITIALIZED; + return code; } + SEpSet epSet = pVg->epSet; + int64_t begin = pVg->offsetInfo.walVerBegin; + int64_t end = pVg->offsetInfo.walVerEnd; + taosWUnLockLatch(&tmq->lock); int64_t position = 0; - STqOffsetVal* pOffsetInfo = &pVg->offsetInfo.endOffset; if(type == TMQ_OFFSET__LOG){ - position = pOffsetInfo->version; - }else if(type == TMQ_OFFSET__RESET_EARLIEST){ - position = pVg->offsetInfo.walVerBegin; - }else if(type == TMQ_OFFSET__RESET_LATEST){ - position = pVg->offsetInfo.walVerEnd; + position = pOffsetInfo->endOffset.version; + }else if(type == TMQ_OFFSET__RESET_EARLIEST || type == TMQ_OFFSET__RESET_LATEST){ + code = getCommittedFromServer(tmq, tname, vgId, &epSet); + if(code == TSDB_CODE_TMQ_NO_COMMITTED){ + if(type == TMQ_OFFSET__RESET_EARLIEST){ + position = begin; + } else if(type == TMQ_OFFSET__RESET_LATEST){ + position = end; + } + }else{ + position = code; + } }else{ tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); } - taosWUnLockLatch(&tmq->lock); return position; } +int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId){ + if (tmq == NULL || pTopicName == NULL) { + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } + + int32_t accId = tmq->pTscObj->acctId; + char tname[TSDB_TOPIC_FNAME_LEN] = {0}; + sprintf(tname, "%d.%s", accId, pTopicName); + + taosWLockLatch(&tmq->lock); + + SMqClientVg* pVg = NULL; + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ + taosWUnLockLatch(&tmq->lock); + return code; + } + + SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; + if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->endOffset.type); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + + if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->committedOffset.type); + taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; + } + + int64_t committed = 0; + if(pOffsetInfo->committedOffset.type == TMQ_OFFSET__LOG){ + committed = pOffsetInfo->committedOffset.version; + taosWUnLockLatch(&tmq->lock); + return committed; + } + SEpSet epSet = pVg->epSet; + taosWUnLockLatch(&tmq->lock); + + return getCommittedFromServer(tmq, tname, vgId, &epSet); +} + int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t* numOfAssignment) { + if(tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL){ + tscError("invalid tmq handle, null"); + return TSDB_CODE_INVALID_PARA; + } *numOfAssignment = 0; *assignment = NULL; SMqVgCommon* pCommon = NULL; @@ -2881,7 +3079,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { } int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { - if (tmq == NULL) { + if (tmq == NULL || pTopicName == NULL) { tscError("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -2891,27 +3089,12 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sprintf(tname, "%d.%s", accId, pTopicName); taosWLockLatch(&tmq->lock); - SMqClientTopic* pTopic = getTopicByName(tmq, tname); - if (pTopic == NULL) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_TOPIC; - } SMqClientVg* pVg = NULL; - int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); - for (int32_t i = 0; i < numOfVgs; ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgId) { - pVg = pClientVg; - break; - } - } - - if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + int32_t code = getClientVg(tmq, tname, vgId, &pVg); + if(code != 0){ taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_INVALID_VGID; + return code; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; @@ -2923,53 +3106,44 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } - if (!isWalRangeOk(&pVg->offsetInfo)) { - tscError("consumer:0x%" PRIx64 " Assignment or poll interface need to be called first", tmq->consumerId); - taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_NEED_INITIALIZED; - } - - if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) { - tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", - tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); + code = checkWalRange(pOffsetInfo, -1); + if (code != 0) { taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; + return code; } + tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); // update the offset, and then commit to vnode pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->endOffset.version = offset; pOffsetInfo->beginOffset = pOffsetInfo->endOffset; pVg->seekUpdated = true; - tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); + SEpSet epSet = pVg->epSet; + taosWUnLockLatch(&tmq->lock); SMqSeekReq req = {0}; - snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, pTopic->topicName); - req.head.vgId = pVg->vgId; + snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, tname); + req.head.vgId = vgId; req.consumerId = tmq->consumerId; int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req); if (msgSize < 0) { - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_PAR_INTERNAL_ERROR; } char* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_OUT_OF_MEMORY; } if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) { taosMemoryFree(msg); - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_PAR_INTERNAL_ERROR; } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(msg); - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2977,7 +3151,6 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (pParam == NULL) { taosMemoryFree(msg); taosMemoryFree(sendInfo); - taosWUnLockLatch(&tmq->lock); return TSDB_CODE_OUT_OF_MEMORY; } tsem_init(&pParam->sem, 0, 0); @@ -2991,18 +3164,15 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int64_t transporterId = 0; tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, - tmq->consumerId, pTopic->topicName, vgId, tmq->epoch); - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - taosWUnLockLatch(&tmq->lock); + tmq->consumerId, tname, vgId, tmq->epoch); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pParam->sem); - int32_t code = pParam->code; + code = pParam->code; tsem_destroy(&pParam->sem); taosMemoryFree(pParam); - if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, vgId, tstrerror(code)); - } + tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code)); return code; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6aeb2152d5..bfd6908e16 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1075,6 +1075,89 @@ TEST(clientCase, sub_db_test) { fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } +TEST(clientCase, tmq_commit) { +// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + char topicName[128] = "tp"; + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, topicName); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 2000; + + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + + int32_t code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + + int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId); + printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position); + tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1); + position = tmq_position(tmq, topicName, pAssign[i].vgId); + printf("after seek 100, position vgId:%d, position:%lld\n", pAssign[i].vgId, position); + } + + while (1) { + printf("start to poll\n"); + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + printSubResults(pRes, &totalRows); + } else { + break; + } + + tmq_commit_sync(tmq, pRes); + for(int i = 0; i < numOfAssign; i++) { + int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId); + printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed); + } + if (pRes != NULL) { + taos_free_result(pRes); + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); + } + + tmq_free_assignment(pAssign); + + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} + TEST(clientCase, td_25129) { // taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 738b7db46a..8a5a4e5079 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -732,6 +732,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME_PUSH, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_COMMITTEDINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b5a7e5fc6b..2a5cdbe555 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -232,6 +232,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg); // tq-stream int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0b10b62267..bf0067b128 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -578,6 +578,49 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return code; } +int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { + void* data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + + SMqVgOffset vgOffset = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)data, len); + if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + tDecoderClear(&decoder); + + STqOffset* pOffset = &vgOffset.offset; + STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); + if (pSavedOffset == NULL) { + return TSDB_CODE_TMQ_NO_COMMITTED; + } + vgOffset.offset = *pSavedOffset; + + int32_t code = 0; + tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code); + if (code < 0) { + return TSDB_CODE_INVALID_PARA; + } + + void* buf = taosMemoryCalloc(1, len); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SEncoder encoder; + tEncoderInit(&encoder, buf, len); + tEncodeMqVgOffset(&encoder, &vgOffset); + tEncoderClear(&encoder); + + SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0}; + + tmsgSendRsp(&rsp); + + return 0; +} + int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0d9c478c1b..204107ee3c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -462,7 +462,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_VND_TMQ_COMMIT_OFFSET: - if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) { + if (tqProcessOffsetCommitReq(pVnode->pTq, ver, pReq, len) < 0) { goto _err; } break; @@ -638,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); + case TDMT_VND_TMQ_VG_COMMITTEDINFO: + return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_SEEK: return tqProcessSeekReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c36480a63e..83a50f7051 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -633,6 +633,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NEED_INITIALIZED, "Assignment or poll interface need to be called first") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_COMMITTED, "No committed info") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_TOPIC, "Topic does not belong to this consumer") -- GitLab