diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4b0a741213128b54ccd314e0a4c71a787d2d9e27..aa0a243e6866b65f7a702901d65dc393ca9d0ce0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2901,7 +2901,7 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pRe // tqOffset enum { TMQ_OFFSET__RESET_NONE = -3, - TMQ_OFFSET__RESET_EARLIEAST = -2, + TMQ_OFFSET__RESET_EARLIEST = -2, TMQ_OFFSET__RESET_LATEST = -1, TMQ_OFFSET__LOG = 1, TMQ_OFFSET__SNAPSHOT_DATA = 2, diff --git a/include/util/tdef.h b/include/util/tdef.h index 2f566ef696fd4f5256211b673ded2d6248a18f27..8122f94a110435f513a15a2e7c194383aaa93ae8 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -195,7 +195,7 @@ typedef enum ELogicConditionType { #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string #define TSDB_CGROUP_LEN 193 // it is a null-terminated string -#define TSDB_OFFSET_LEN 80 // it is a null-terminated string +#define TSDB_OFFSET_LEN 64 // it is a null-terminated string #define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string #define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string #define TSDB_DB_NAME_LEN 65 diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c15f6d55bccdedb650fab889bb0e37fd9d366f05..cb38996ea13c9d88f2ce1fffa039c65f3d06ff2d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -264,7 +264,7 @@ tmq_conf_t* tmq_conf_new() { conf->withTbName = false; conf->autoCommit = true; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; - conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST; + conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST; conf->hbBgEnable = true; return conf; @@ -318,7 +318,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value conf->resetOffset = TMQ_OFFSET__RESET_NONE; return TMQ_CONF_OK; } else if (strcasecmp(value, "earliest") == 0) { - conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST; + conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST; return TMQ_CONF_OK; } else if (strcasecmp(value, "latest") == 0) { conf->resetOffset = TMQ_OFFSET__RESET_LATEST; @@ -809,7 +809,6 @@ void tmqSendHbReq(void* param, void* tmrId) { offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; offRows->offset = pVg->offsetInfo.committedOffset; - tscDebug("report row:%lldd, offset:%" PRId64, offRows->rows, offRows->offset.version); } } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 21721d85a5b7845980918cc4147c0fedd5fe5ac3..722092a043fda4a324cd3893b7bf98829b9ba5ad 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -361,11 +361,11 @@ static const SSysDbTableSchema consumerSchema[] = { {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "withTbName", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "useSnapshot", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "autoCommit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "autoCommitInterval", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "resetOffsetCfg", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "msg.with.table.name", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, + {.name = "experimental.snapshot.enable", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, + {.name = "enable.auto.commit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, + {.name = "auto.commit.interval.ms", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "auto.offset.reset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema offsetSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f14375f9bf841ef75e8aafbbda6dbd04eaf84739..40ed29dc9944b4b1ae669ca8082f646010c67970 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7131,15 +7131,15 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { if (pVal->type == TMQ_OFFSET__RESET_NONE) { - snprintf(buf, maxLen, "offset(reset to none)"); - } else if (pVal->type == TMQ_OFFSET__RESET_EARLIEAST) { - snprintf(buf, maxLen, "offset(reset to earlieast)"); + snprintf(buf, maxLen, "none"); + } else if (pVal->type == TMQ_OFFSET__RESET_EARLIEST) { + snprintf(buf, maxLen, "earliest"); } else if (pVal->type == TMQ_OFFSET__RESET_LATEST) { - snprintf(buf, maxLen, "offset(reset to latest)"); + snprintf(buf, maxLen, "latest"); } else if (pVal->type == TMQ_OFFSET__LOG) { - snprintf(buf, maxLen, "offset(log) ver:%" PRId64, pVal->version); + snprintf(buf, maxLen, "log:%" PRId64, pVal->version); } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) { - snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts); + snprintf(buf, maxLen, "snapshot:%" PRId64 "|%" PRId64, pVal->uid, pVal->ts); } else { return TSDB_CODE_INVALID_PARA; } @@ -7157,7 +7157,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { return pLeft->uid == pRight->uid; } else { ASSERT(0); - /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/ + /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEST ||*/ /*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/ /*return true;*/ } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4a6a37395d4de47d4d3b7e0805787003d8ae0414..fc1a30ef5d291e45f3fd66347344ed1aade99ff3 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -1197,8 +1197,13 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommitInterval, false); + char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; + tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &pVal); + varDataSetLen(buf, strlen(varDataVal(buf))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->resetOffsetCfg, false); + colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); numOfRows++; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0bb3b7575aa79fcbf99f3e0cb5ba9549556c60cd..c10564a36985774b0ecd2701e5a54f80f5dd17a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -559,7 +559,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { } else { dataRsp.rspOffset.version = currentVer; // return current consume offset value } - } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { + } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { dataRsp.rspOffset.version = ever; @@ -754,7 +754,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosArrayDestroy(tbUidList); } - taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); +id taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index e67b8906a9bc455da10ea082d511d0d1717112ff..8607fd754e33069afb9a620651e516780b6544b8 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -107,7 +107,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } else { // no poll occurs in this vnode for this topic, let's seek to the right offset value. - if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { + if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { if (pRequest->useSnapshot) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", consumerId, pHandle->subKey, vgId);