未验证 提交 dfd2278d 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20042 from taosdata/fix/liaohj

refactor: do some internal refactor and add some logs.
...@@ -509,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -509,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
.handle = NULL, .handle = NULL,
}; };
tscDebug("consumer:0x%" PRIx64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey, tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64, tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version); pVg->vgId, pOffset->val.version);
// TODO: put into cb // TODO: put into cb
...@@ -643,16 +643,15 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, ...@@ -643,16 +643,15 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) { for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
tscDebug("consumer:0x%" PRIx64 " begin commit for topic %s, vgId:%d, ordinal:%d/%d", tmq->consumerId, pTopic->topicName,
pVg->vgId, j + 1, numOfVgroups);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer:0x%" PRId64 " vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId, tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, current %" PRId64 ", committed %" PRId64, tmq->consumerId,
pVg->currentOffset.version, pVg->committedOffset.version); pTopic->topicName, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue; continue;
} }
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
} }
} }
} }
...@@ -808,7 +807,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { ...@@ -808,7 +807,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" will retrieve ep from mnode in 1s", pTmq->consumerId); tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
...@@ -816,7 +815,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { ...@@ -816,7 +815,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" will commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
// do nothing // do nothing
...@@ -1309,7 +1308,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1309,7 +1308,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:0x%" PRIx64", update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tscDebug("consumer:0x%" PRIx64" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
...@@ -1634,14 +1633,18 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1634,14 +1633,18 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
// broadcast the poll request to all related vnodes // broadcast the poll request to all related vnodes
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64" start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt); vgSkipCnt);
continue; continue;
/*if (vgSkipCnt < 10000) continue;*/ /*if (vgSkipCnt < 10000) continue;*/
...@@ -1653,6 +1656,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1653,6 +1656,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
} }
#endif #endif
} }
atomic_store_32(&pVg->vgSkipCnt, 0); atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq req = {0}; SMqPollReq req = {0};
...@@ -1663,6 +1667,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1663,6 +1667,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
return -1; return -1;
} }
char* msg = taosMemoryCalloc(1, msgSize); char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
...@@ -1684,6 +1689,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1684,6 +1689,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
return -1; return -1;
} }
pParam->refId = tmq->refId; pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch; pParam->epoch = tmq->epoch;
...@@ -1790,7 +1796,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1790,7 +1796,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
...@@ -1809,7 +1815,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1809,7 +1815,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
...@@ -1840,7 +1846,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1840,7 +1846,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
...@@ -1885,7 +1891,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1885,7 +1891,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (retryCnt++ > 10) { if (retryCnt++ > 10) {
return NULL; return NULL;
} }
tscDebug("consumer not ready, retry");
tscDebug("consumer:0x%"PRIx64" not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500); taosMsleep(500);
} }
} }
......
...@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) { ...@@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "group.id", "newabcdefgjhijlm__");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
......
...@@ -413,19 +413,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { ...@@ -413,19 +413,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
return (void *)buf; return (void *)buf;
} }
SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) { SMqSubscribeObj *tNewSubscribeObj(const char* key) {
SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL; if (pSubObj == NULL) {
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN); return NULL;
taosInitRWLatch(&pSubNew->lock); }
pSubNew->vgNum = 0;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set hash free fp
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *)); memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubObj->lock);
pSubObj->vgNum = 0;
pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
return pSubNew; // TODO set hash free fp
/*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
return pSubObj;
} }
SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册