未验证 提交 91940ac7 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13714 from taosdata/feature/stream

enh(tmq): add log
......@@ -447,6 +447,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) {
mInfo("receive subscribe request from new consumer: %ld", consumerId);
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
......@@ -463,7 +465,12 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} else {
/*taosRLockLatch(&pConsumerOld->lock);*/
int32_t status = atomic_load_32(&pConsumerOld->status);
mInfo("receive subscribe request from old consumer: %ld, current status: %s", consumerId,
mndConsumerStatusName(status));
if (status != MQ_CONSUMER_STATUS__READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto SUBSCRIBE_OVER;
......
......@@ -183,7 +183,10 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
mInfo("commit offset %ld to vg %d of consumer group %s on topic %s", pOffset->offset, pOffset->vgId,
pOffset->cgroup, pOffset->topicName);
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
mError("submit offset to topic %s failed", pOffset->topicName);
return -1;
}
bool create = false;
......@@ -192,7 +195,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName);
if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
continue;
}
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
......
......@@ -130,7 +130,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
ASSERT(pHandle);
/*ASSERT(pHandle);*/
if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId,
pReq->subKey);
return -1;
}
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
return -1;
}
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
while (consumerEpoch < reqEpoch) {
......
......@@ -98,7 +98,7 @@ python3 ./test.py -f 2-query/statecount.py
python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/subscribeDb.py
python3 ./test.py -f 7-tmq/subscribeDb0.py
#python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeDb1.py
python3 ./test.py -f 7-tmq/subscribeStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py
python3 ./test.py -f 7-tmq/subscribeStb1.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册