提交 798c3a9f 编写于 作者: L Liu Jicong

add more log

上级 e4404dc8
......@@ -917,10 +917,10 @@ CREATE_MSG_FAIL:
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
tscDebug("consumer %ld update ep epoch %d to epoch %d", tmq->consumerId, tmq->epoch, epoch);
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, topicNumGet);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) {
return false;
......@@ -938,17 +938,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
taosHashClear(pHash);
topic.topicName = strdup(pTopicEp->topic);
tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) {
// find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
if (vgNumCur == 0) break;
for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
tscDebug("consumer %ld epoch %d vg %d build %s\n", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
}
break;
......@@ -962,10 +964,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset;
tscDebug("consumer %ld epoch %d vg %d offset og to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset);
tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
if (pOffset != NULL) {
offset = *pOffset;
tscDebug("consumer %ld epoch %d vg %d found %s\n", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
}
tscDebug("consumer %ld epoch %d vg %d offset set to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = {
......@@ -1231,7 +1233,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
int64_t skipCnt = atomic_add_fetch_64(&pVg->skipCnt, 1);
tscDebug("consumer %ld skip vg %d skip cnt %ld", tmq->consumerId, pVg->vgId, skipCnt);
tscDebug("consumer %ld epoch %d skip vg %d skip cnt %ld", tmq->consumerId, tmq->epoch, pVg->vgId, skipCnt);
continue;
#if 0
if (skipCnt < 30000) {
......
......@@ -160,10 +160,8 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
pOldConsumer->epoch++;
// TODO handle update
/*taosWLockLatch(&pOldConsumer->lock);*/
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
/*taosWUnLockLatch(&pOldConsumer->lock);*/
return 0;
......
......@@ -471,6 +471,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId;
mDebug("init subscribption %s, assign vg: %d", pSub->key, consumerEp.vgId);
int32_t msgLen;
if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
......
......@@ -212,19 +212,24 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
//TODO add lock
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
int32_t serverEpoch = pConsumer->epoch;
// TODO
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
mTrace("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, pConsumer->epoch, hbStatus);
mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, hbStatus);
atomic_store_32(&pConsumer->hbStatus, 0);
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWrite(pMnode->pSdb, pConsumerRaw);*/
strcpy(rsp.cgroup, pReq->cgroup);
if (epoch != pConsumer->epoch) {
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch);
if (epoch != serverEpoch) {
mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, serverEpoch);
mDebug("consumer %ld try r lock", consumerId);
taosRLockLatch(&pConsumer->lock);
mDebug("consumer %ld r locked", consumerId);
SArray *pTopics = pConsumer->currentTopics;
int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
......@@ -238,7 +243,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
mInfo("topic %s has %d vg", topicName, pConsumer->epoch);
mInfo("topic %s has %d vg", topicName, serverEpoch);
SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
......@@ -264,6 +269,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
}
mndReleaseSubscribe(pMnode, pSub);
}
taosRUnLockLatch(&pConsumer->lock);
mDebug("consumer %ld r unlock", consumerId);
}
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen);
......@@ -272,7 +279,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
return -1;
}
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = pConsumer->epoch;
((SMqRspHead *)buf)->epoch = serverEpoch;
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
......@@ -395,7 +402,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
taosMemoryFreeClear(pRebSub->key);
mInfo("mq rebalance subscription: %s", pSub->key);
mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, (int32_t)taosArrayGetSize(pSub->unassignedVg));
// remove lost consumer
for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
......@@ -442,6 +449,9 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
}
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
mDebug("consumer %ld try w lock", pRebConsumer->consumerId);
taosWLockLatch(&pRebConsumer->lock);
mDebug("consumer %ld w locked", pRebConsumer->consumerId);
int32_t status = atomic_load_32(&pRebConsumer->status);
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
......@@ -462,6 +472,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendCommitlog(pTrans, pConsumerRaw);
}
taosWUnLockLatch(&pRebConsumer->lock);
mDebug("consumer %ld w unlock", pRebConsumer->consumerId);
mndReleaseConsumer(pMnode, pRebConsumer);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册