diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fa2d4984bf19aaa587654a017ab9f5f0a5ab123d..7c5182d76cc0282c166f285eddc26cf9afd920c9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2036,7 +2036,6 @@ typedef struct { SArray* topicNames; // SArray int8_t withTbName; - int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; int8_t resetOffsetCfg; @@ -2056,7 +2055,6 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc } tlen += taosEncodeFixedI8(buf, pReq->withTbName); - tlen += taosEncodeFixedI8(buf, pReq->useSnapshot); tlen += taosEncodeFixedI8(buf, pReq->autoCommit); tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); @@ -2080,7 +2078,6 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq } buf = taosDecodeFixedI8(buf, &pReq->withTbName); - buf = taosDecodeFixedI8(buf, &pReq->useSnapshot); buf = taosDecodeFixedI8(buf, &pReq->autoCommit); buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 26887e2adea1efcda433c437c8af98e94bb0a1ff..e7927cd0aed68704ed66ff07aafce34677cd3f14 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -358,7 +358,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } - if (strcasecmp(key, "enable.heartbeat.background") == 0) { +// if (strcasecmp(key, "enable.heartbeat.background") == 0) { // if (strcasecmp(value, "true") == 0) { // conf->hbBgEnable = true; // return TMQ_CONF_OK; @@ -366,10 +366,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value // conf->hbBgEnable = false; // return TMQ_CONF_OK; // } else { - tscError("the default value of enable.heartbeat.background is true, can not be seted"); - return TMQ_CONF_INVALID; +// tscError("the default value of enable.heartbeat.background is true, can not be seted"); +// return TMQ_CONF_INVALID; // } - } +// } if (strcasecmp(key, "td.connect.ip") == 0) { conf->ip = taosStrdup(value); @@ -423,30 +423,30 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, - int32_t* numOfVgroups) { - int32_t numOfTopics = taosArrayGetSize(pTopicList); - *index = -1; - *numOfVgroups = 0; - - for (int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); - if (strcmp(pTopic->topicName, pName) != 0) { - continue; - } - - *numOfVgroups = taosArrayGetSize(pTopic->vgs); - for (int32_t j = 0; j < (*numOfVgroups); ++j) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->vgId == vgId) { - *index = j; - return pClientVg; - } - } - } - - return NULL; -} +//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, +// int32_t* numOfVgroups) { +// int32_t numOfTopics = taosArrayGetSize(pTopicList); +// *index = -1; +// *numOfVgroups = 0; +// +// for (int32_t i = 0; i < numOfTopics; ++i) { +// SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); +// if (strcmp(pTopic->topicName, pName) != 0) { +// continue; +// } +// +// *numOfVgroups = taosArrayGetSize(pTopic->vgs); +// for (int32_t j = 0; j < (*numOfVgroups); ++j) { +// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); +// if (pClientVg->vgId == vgId) { +// *index = j; +// return pClientVg; +// } +// } +// } +// +// return NULL; +//} // Two problems do not need to be addressed here // 1. update to of epset. the response of poll request will automatically handle this problem @@ -573,7 +573,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN char commitBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, + tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId); @@ -811,7 +811,9 @@ void tmqSendHbReq(void* param, void* tmrId) { offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; offRows->offset = pVg->offsetInfo.committedOffset; - tscDebug("report offset: %d", offRows->offset.type); + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); + tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows); } } tmq->needReportOffsetRows = false; @@ -862,7 +864,7 @@ OVER: static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { if (code != 0) { - tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); + tscError("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); } } @@ -1161,7 +1163,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = 0; - tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); + tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); @@ -1174,7 +1176,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } req.withTbName = tmq->withTbName; - req.useSnapshot = tmq->useSnapshot; req.autoCommit = tmq->autoCommit; req.autoCommitInterval = tmq->autoCommitInterval; req.resetOffsetCfg = tmq->resetOffsetCfg; @@ -1190,7 +1191,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } tNameExtractFullName(&name, topicFName); - tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); + tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); taosArrayPush(req.topicNames, &topicFName); } @@ -1251,7 +1252,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto FAIL; } - tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); + tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } @@ -1478,7 +1479,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); for (int32_t j = 0; j < vgNumGet; j++) { @@ -1531,7 +1532,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) int32_t topicNumGet = taosArrayGetSize(pRsp->topics); char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", + tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); if (epoch <= tmq->epoch) { return false; @@ -1554,14 +1555,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); + tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[TSDB_OFFSET_LEN]; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); - tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, + tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows}; @@ -1591,7 +1592,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) atomic_store_8(&tmq->status, flag); atomic_store_32(&tmq->epoch, epoch); - tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); return set; } @@ -1627,7 +1628,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); if (head->epoch <= epoch) { - tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", + tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", tmq->consumerId, head->epoch, epoch); if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) { @@ -1639,7 +1640,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { } } else { - tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, + tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, head->epoch, epoch); } @@ -2067,12 +2068,12 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, + tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); taosMsleep(500); // sleep for a while return NULL; } @@ -2084,7 +2085,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } - tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt); + tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } } @@ -2093,7 +2094,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmqHandleAllDelayedTask(tmq); if (tmqPollImpl(tmq, timeout) < 0) { - tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); + tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); } rspObj = tmqHandleAllRsp(tmq, timeout, false); @@ -2101,7 +2102,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); return NULL; } @@ -2109,7 +2110,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; if (elapsedTime > timeout) { - tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } @@ -2142,7 +2143,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) { } int32_t tmq_consumer_close(tmq_t* tmq) { - tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); + tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); displayConsumeStatistics(tmq); if (tmq->status == TMQ_CONSUMER_STATUS__READY) { @@ -2169,7 +2170,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tmq_list_destroy(lst); } else { - tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); } taosRemoveRef(tmqMgmt.rsetId, tmq->refId); @@ -2432,7 +2433,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); - tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); + tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); int64_t transporterId = 0; asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); @@ -2656,7 +2657,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a char offsetFormatBuf[TSDB_OFFSET_LEN]; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); - tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, + tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); } @@ -2693,7 +2694,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a char offsetBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); + tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; @@ -2772,7 +2773,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); - tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); + tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); if (pInfo == NULL) { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 4b27f50f4191ddd7221410d6addd46472aa29b38..07c74927e148fce0663c377d053363fa69750460 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -361,11 +361,7 @@ static const SSysDbTableSchema consumerSchema[] = { {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "msg.with.table.name", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "experimental.snapshot.enable", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "enable.auto.commit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "auto.commit.interval.ms", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "auto.offset.reset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema offsetSchema[] = { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 62beee03037a5b384a0e80a94c419610c647047c..44bd8f74c8d19fa55ea900def0f5435557098d63 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -553,7 +553,6 @@ typedef struct { int64_t rebalanceTime; int8_t withTbName; - int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c8ee35f402d9219068a289757676003d4bb3faac..4dded61ce3fbbe4663a1047ee6aa1b9562eda767 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -697,7 +697,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); pConsumerNew->withTbName = subscribe.withTbName; - pConsumerNew->useSnapshot = subscribe.useSnapshot; pConsumerNew->autoCommit = subscribe.autoCommit; pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval; pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg; @@ -1186,25 +1185,16 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->withTbName, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->useSnapshot, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommit, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommitInterval, false); - - char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; - tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &pVal); - varDataSetLen(buf, strlen(varDataVal(buf))); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); + + char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); + varDataSetLen(parasStr, strlen(varDataVal(parasStr))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); + colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false); numOfRows++; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index f0fa40cacf2b6eb7f32a334e914f1c284f2db589..09c4053f9319754370117dd6d4fa71174d75c7f8 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -326,7 +326,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { } tlen += taosEncodeFixedI8(buf, pConsumer->withTbName); - tlen += taosEncodeFixedI8(buf, pConsumer->useSnapshot); tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); @@ -386,7 +385,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s if(sver > 1){ buf = taosDecodeFixedI8(buf, &pConsumer->withTbName); - buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot); buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); diff --git a/tests/system-test/7-tmq/checkOffsetRowParams.py b/tests/system-test/7-tmq/checkOffsetRowParams.py index 17c80c68bfb9cf59bee2028eff16c2c15725659a..8a24148064c8125513f07bab0745865a167126cd 100644 --- a/tests/system-test/7-tmq/checkOffsetRowParams.py +++ b/tests/system-test/7-tmq/checkOffsetRowParams.py @@ -243,6 +243,10 @@ class TDTestCase: tdSql.checkData(0, 5, 0) break + tdSql.query("show consumers") + tdSql.checkRows(1) + tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000,reset:earliest") + time.sleep(2) tdLog.info("start insert data") self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])