未验证 提交 03413314 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21716 from taosdata/feature/TS-3495

fix:open info log in tmq client & modify parametes in show consumers
...@@ -2036,7 +2036,6 @@ typedef struct { ...@@ -2036,7 +2036,6 @@ typedef struct {
SArray* topicNames; // SArray<char**> SArray* topicNames; // SArray<char**>
int8_t withTbName; int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int8_t resetOffsetCfg; int8_t resetOffsetCfg;
...@@ -2056,7 +2055,6 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc ...@@ -2056,7 +2055,6 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
} }
tlen += taosEncodeFixedI8(buf, pReq->withTbName); tlen += taosEncodeFixedI8(buf, pReq->withTbName);
tlen += taosEncodeFixedI8(buf, pReq->useSnapshot);
tlen += taosEncodeFixedI8(buf, pReq->autoCommit); tlen += taosEncodeFixedI8(buf, pReq->autoCommit);
tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval);
tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg);
...@@ -2080,7 +2078,6 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq ...@@ -2080,7 +2078,6 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
} }
buf = taosDecodeFixedI8(buf, &pReq->withTbName); buf = taosDecodeFixedI8(buf, &pReq->withTbName);
buf = taosDecodeFixedI8(buf, &pReq->useSnapshot);
buf = taosDecodeFixedI8(buf, &pReq->autoCommit); buf = taosDecodeFixedI8(buf, &pReq->autoCommit);
buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval);
buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg);
......
...@@ -358,7 +358,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -358,7 +358,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcasecmp(key, "enable.heartbeat.background") == 0) { // if (strcasecmp(key, "enable.heartbeat.background") == 0) {
// if (strcasecmp(value, "true") == 0) { // if (strcasecmp(value, "true") == 0) {
// conf->hbBgEnable = true; // conf->hbBgEnable = true;
// return TMQ_CONF_OK; // return TMQ_CONF_OK;
...@@ -366,10 +366,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -366,10 +366,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
// conf->hbBgEnable = false; // conf->hbBgEnable = false;
// return TMQ_CONF_OK; // return TMQ_CONF_OK;
// } else { // } else {
tscError("the default value of enable.heartbeat.background is true, can not be seted"); // tscError("the default value of enable.heartbeat.background is true, can not be seted");
return TMQ_CONF_INVALID; // return TMQ_CONF_INVALID;
// } // }
} // }
if (strcasecmp(key, "td.connect.ip") == 0) { if (strcasecmp(key, "td.connect.ip") == 0) {
conf->ip = taosStrdup(value); conf->ip = taosStrdup(value);
...@@ -423,30 +423,30 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { ...@@ -423,30 +423,30 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
return container->pData; return container->pData;
} }
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, //static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
int32_t* numOfVgroups) { // int32_t* numOfVgroups) {
int32_t numOfTopics = taosArrayGetSize(pTopicList); // int32_t numOfTopics = taosArrayGetSize(pTopicList);
*index = -1; // *index = -1;
*numOfVgroups = 0; // *numOfVgroups = 0;
//
for (int32_t i = 0; i < numOfTopics; ++i) { // for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); // SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
if (strcmp(pTopic->topicName, pName) != 0) { // if (strcmp(pTopic->topicName, pName) != 0) {
continue; // continue;
} // }
//
*numOfVgroups = taosArrayGetSize(pTopic->vgs); // *numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < (*numOfVgroups); ++j) { // for (int32_t j = 0; j < (*numOfVgroups); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); // SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg->vgId == vgId) { // if (pClientVg->vgId == vgId) {
*index = j; // *index = j;
return pClientVg; // return pClientVg;
} // }
} // }
} // }
//
return NULL; // return NULL;
} //}
// Two problems do not need to be addressed here // Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem // 1. update to of epset. the response of poll request will automatically handle this problem
...@@ -573,7 +573,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN ...@@ -573,7 +573,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
char commitBuf[TSDB_OFFSET_LEN] = {0}; char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
totalVgroups, pMsgSendInfo->requestId); totalVgroups, pMsgSendInfo->requestId);
...@@ -811,7 +811,9 @@ void tmqSendHbReq(void* param, void* tmrId) { ...@@ -811,7 +811,9 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->vgId = pVg->vgId; offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows; offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.committedOffset; offRows->offset = pVg->offsetInfo.committedOffset;
tscDebug("report offset: %d", offRows->offset.type); char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
} }
} }
tmq->needReportOffsetRows = false; tmq->needReportOffsetRows = false;
...@@ -862,7 +864,7 @@ OVER: ...@@ -862,7 +864,7 @@ OVER:
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
if (code != 0) { if (code != 0) {
tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); tscError("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
} }
} }
...@@ -1161,7 +1163,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1161,7 +1163,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SCMSubscribeReq req = {0}; SCMSubscribeReq req = {0};
int32_t code = 0; int32_t code = 0;
tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256); tstrncpy(req.clientId, tmq->clientId, 256);
...@@ -1174,7 +1176,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1174,7 +1176,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
req.withTbName = tmq->withTbName; req.withTbName = tmq->withTbName;
req.useSnapshot = tmq->useSnapshot;
req.autoCommit = tmq->autoCommit; req.autoCommit = tmq->autoCommit;
req.autoCommitInterval = tmq->autoCommitInterval; req.autoCommitInterval = tmq->autoCommitInterval;
req.resetOffsetCfg = tmq->resetOffsetCfg; req.resetOffsetCfg = tmq->resetOffsetCfg;
...@@ -1190,7 +1191,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1190,7 +1191,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
tNameExtractFullName(&name, topicFName); tNameExtractFullName(&name, topicFName);
tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
taosArrayPush(req.topicNames, &topicFName); taosArrayPush(req.topicNames, &topicFName);
} }
...@@ -1251,7 +1252,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1251,7 +1252,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
goto FAIL; goto FAIL;
} }
tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500); taosMsleep(500);
} }
...@@ -1478,7 +1479,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic ...@@ -1478,7 +1479,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) { for (int32_t j = 0; j < vgNumGet; j++) {
...@@ -1531,7 +1532,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) ...@@ -1531,7 +1532,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
if (epoch <= tmq->epoch) { if (epoch <= tmq->epoch) {
return false; return false;
...@@ -1554,14 +1555,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) ...@@ -1554,14 +1555,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if (pTopicCur->vgs) { if (pTopicCur->vgs) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
for (int32_t j = 0; j < vgNumCur; j++) { for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
char buf[TSDB_OFFSET_LEN]; char buf[TSDB_OFFSET_LEN];
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf); vgKey, buf);
SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows}; SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
...@@ -1591,7 +1592,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) ...@@ -1591,7 +1592,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
atomic_store_8(&tmq->status, flag); atomic_store_8(&tmq->status, flag);
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
return set; return set;
} }
...@@ -1627,7 +1628,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1627,7 +1628,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SMqRspHead* head = pMsg->pData; SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch); int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch <= epoch) { if (head->epoch <= epoch) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
tmq->consumerId, head->epoch, epoch); tmq->consumerId, head->epoch, epoch);
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) { if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
...@@ -1639,7 +1640,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1639,7 +1640,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
} }
} else { } else {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch); head->epoch, epoch);
} }
...@@ -2067,12 +2068,12 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -2067,12 +2068,12 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj; void* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
timeout); timeout);
// in no topic status, delayed task also need to be processed // in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); tscInfo("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
taosMsleep(500); // sleep for a while taosMsleep(500); // sleep for a while
return NULL; return NULL;
} }
...@@ -2084,7 +2085,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -2084,7 +2085,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL; return NULL;
} }
tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt); tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500); taosMsleep(500);
} }
} }
...@@ -2093,7 +2094,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -2093,7 +2094,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) { if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
} }
rspObj = tmqHandleAllRsp(tmq, timeout, false); rspObj = tmqHandleAllRsp(tmq, timeout, false);
...@@ -2101,7 +2102,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -2101,7 +2102,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj; return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
return NULL; return NULL;
} }
...@@ -2109,7 +2110,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -2109,7 +2110,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
int64_t currentTime = taosGetTimestampMs(); int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime; int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout) { if (elapsedTime > timeout) {
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime); tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL; return NULL;
} }
...@@ -2142,7 +2143,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) { ...@@ -2142,7 +2143,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) {
} }
int32_t tmq_consumer_close(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) {
tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq); displayConsumeStatistics(tmq);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
...@@ -2169,7 +2170,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { ...@@ -2169,7 +2170,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
tmq_list_destroy(lst); tmq_list_destroy(lst);
} else { } else {
tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
} }
taosRemoveRef(tmqMgmt.rsetId, tmq->refId); taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
...@@ -2432,7 +2433,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { ...@@ -2432,7 +2433,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
...@@ -2656,7 +2657,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2656,7 +2657,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
char offsetFormatBuf[TSDB_OFFSET_LEN]; char offsetFormatBuf[TSDB_OFFSET_LEN];
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
} }
...@@ -2693,7 +2694,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2693,7 +2694,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
char offsetBuf[TSDB_OFFSET_LEN] = {0}; char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end; pOffsetInfo->walVerEnd = p->end;
...@@ -2772,7 +2773,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ ...@@ -2772,7 +2773,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
......
...@@ -361,11 +361,7 @@ static const SSysDbTableSchema consumerSchema[] = { ...@@ -361,11 +361,7 @@ static const SSysDbTableSchema consumerSchema[] = {
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "msg.with.table.name", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, {.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "experimental.snapshot.enable", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
{.name = "enable.auto.commit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false},
{.name = "auto.commit.interval.ms", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "auto.offset.reset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
}; };
static const SSysDbTableSchema offsetSchema[] = { static const SSysDbTableSchema offsetSchema[] = {
......
...@@ -553,7 +553,6 @@ typedef struct { ...@@ -553,7 +553,6 @@ typedef struct {
int64_t rebalanceTime; int64_t rebalanceTime;
int8_t withTbName; int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
......
...@@ -697,7 +697,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -697,7 +697,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId));
pConsumerNew->withTbName = subscribe.withTbName; pConsumerNew->withTbName = subscribe.withTbName;
pConsumerNew->useSnapshot = subscribe.useSnapshot;
pConsumerNew->autoCommit = subscribe.autoCommit; pConsumerNew->autoCommit = subscribe.autoCommit;
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval; pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg; pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
...@@ -1186,25 +1185,16 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -1186,25 +1185,16 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); char buf[TSDB_OFFSET_LEN] = {0};
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->withTbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->useSnapshot, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommit, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommitInterval, false);
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg};
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &pVal); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
varDataSetLen(buf, strlen(varDataVal(buf)));
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false);
numOfRows++; numOfRows++;
} }
......
...@@ -326,7 +326,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { ...@@ -326,7 +326,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
} }
tlen += taosEncodeFixedI8(buf, pConsumer->withTbName); tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
tlen += taosEncodeFixedI8(buf, pConsumer->useSnapshot);
tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
...@@ -386,7 +385,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s ...@@ -386,7 +385,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
if(sver > 1){ if(sver > 1){
buf = taosDecodeFixedI8(buf, &pConsumer->withTbName); buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot);
buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit); buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
......
...@@ -243,6 +243,10 @@ class TDTestCase: ...@@ -243,6 +243,10 @@ class TDTestCase:
tdSql.checkData(0, 5, 0) tdSql.checkData(0, 5, 0)
break break
tdSql.query("show consumers")
tdSql.checkRows(1)
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000,reset:earliest")
time.sleep(2) time.sleep(2)
tdLog.info("start insert data") tdLog.info("start insert data")
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册