未验证 提交 5b8be1be 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11226 from taosdata/feature/tq

Feature/tq
...@@ -79,6 +79,7 @@ struct tmq_t { ...@@ -79,6 +79,7 @@ struct tmq_t {
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
int32_t nextTopicIdx; int32_t nextTopicIdx;
int8_t epStatus; int8_t epStatus;
int32_t epSkipCnt;
int32_t waitingRequest; int32_t waitingRequest;
int32_t readyRequest; int32_t readyRequest;
SArray* clientTopics; // SArray<SMqClientTopic> SArray* clientTopics; // SArray<SMqClientTopic>
...@@ -107,6 +108,7 @@ typedef struct { ...@@ -107,6 +108,7 @@ typedef struct {
// connection info // connection info
int32_t vgId; int32_t vgId;
int32_t vgStatus; int32_t vgStatus;
int64_t skipCnt;
SEpSet epSet; SEpSet epSet;
} SMqClientVg; } SMqClientVg;
...@@ -138,6 +140,7 @@ typedef struct { ...@@ -138,6 +140,7 @@ typedef struct {
tmq_t* tmq; tmq_t* tmq;
SMqClientVg* pVg; SMqClientVg* pVg;
int32_t epoch; int32_t epoch;
int32_t vgId;
tsem_t rspSem; tsem_t rspSem;
tmq_message_t** msg; tmq_message_t** msg;
int32_t sync; int32_t sync;
...@@ -313,6 +316,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -313,6 +316,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->waitingRequest = 0; pTmq->waitingRequest = 0;
pTmq->readyRequest = 0; pTmq->readyRequest = 0;
pTmq->epStatus = 0; pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
...@@ -348,6 +352,8 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -348,6 +352,8 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->epoch = 0; pTmq->epoch = 0;
pTmq->waitingRequest = 0; pTmq->waitingRequest = 0;
pTmq->readyRequest = 0; pTmq->readyRequest = 0;
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
...@@ -834,7 +840,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -834,7 +840,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg; SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
tscWarn("msg discard, code:%x", code); tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
goto CREATE_MSG_FAIL; goto CREATE_MSG_FAIL;
} }
...@@ -842,13 +848,13 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -842,13 +848,13 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmqEpoch = atomic_load_32(&tmq->epoch); int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) { if (msgEpoch < tmqEpoch) {
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/ /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
tsem_post(&tmq->rspSem); /*tsem_post(&tmq->rspSem);*/
tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch); tscWarn("discard rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
return 0; return 0;
} }
if (msgEpoch != tmqEpoch) { if (msgEpoch != tmqEpoch) {
tscWarn("mismatch rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch); tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
} else { } else {
atomic_sub_fetch_32(&tmq->waitingRequest, 1); atomic_sub_fetch_32(&tmq->waitingRequest, 1);
} }
...@@ -892,29 +898,29 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -892,29 +898,29 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
#endif #endif
tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset, tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, pRsp->msg.reqOffset,
pRsp->msg.rspOffset); pRsp->msg.rspOffset);
pRsp->vg = pParam->pVg; pRsp->vg = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp); taosWriteQitem(tmq->mqueue, pRsp);
atomic_add_fetch_32(&tmq->readyRequest, 1); atomic_add_fetch_32(&tmq->readyRequest, 1);
tsem_post(&tmq->rspSem); /*tsem_post(&tmq->rspSem);*/
return 0; return 0;
CREATE_MSG_FAIL: CREATE_MSG_FAIL:
if (pParam->epoch == tmq->epoch) { if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
} }
tsem_post(&tmq->rspSem); /*tsem_post(&tmq->rspSem);*/
return code; return code;
} }
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/ /*printf("call update ep %d\n", epoch);*/
tscDebug("tmq update ep epoch %d to epoch %d", tmq->epoch, epoch);
bool set = false; bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, topicNumGet);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) { if (newTopics == NULL) {
return false; return false;
...@@ -932,17 +938,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { ...@@ -932,17 +938,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
taosHashClear(pHash); taosHashClear(pHash);
topic.topicName = strdup(pTopicEp->topic); topic.topicName = strdup(pTopicEp->topic);
tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t j = 0; j < topicNumCur; j++) { for (int32_t j = 0; j < topicNumCur; j++) {
// find old topic // find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) { if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
if (vgNumCur == 0) break; if (vgNumCur == 0) break;
for (int32_t k = 0; k < vgNumCur; k++) { for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
tscDebug("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey); tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
} }
break; break;
...@@ -956,18 +964,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { ...@@ -956,18 +964,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset; int64_t offset = pVgEp->offset;
tscDebug("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset); tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
if (pOffset != NULL) { if (pOffset != NULL) {
offset = *pOffset; offset = *pOffset;
tscDebug("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey); tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
} }
tscDebug("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset); tscDebug("consumer %ld epoch %d vg %d offset set to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffset = offset, .currentOffset = offset,
.vgId = pVgEp->vgId, .vgId = pVgEp->vgId,
.epSet = pVgEp->epSet, .epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE, .vgStatus = TMQ_VG_STATUS__IDLE,
.skipCnt = 0,
}; };
taosArrayPush(topic.vgs, &clientVg); taosArrayPush(topic.vgs, &clientVg);
set = true; set = true;
...@@ -984,9 +993,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { ...@@ -984,9 +993,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
tscDebug("consumer %ld recv ep", tmq->consumerId);
if (code != 0) { if (code != 0) {
tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync); tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
goto END; goto END;
} }
...@@ -995,6 +1003,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -995,6 +1003,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
// Epoch will only increase when received newer epoch ep msg // Epoch will only increase when received newer epoch ep msg
SMqRspHead* head = pMsg->pData; SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch); int32_t epoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
if (head->epoch <= epoch) { if (head->epoch <= epoch) {
goto END; goto END;
} }
...@@ -1019,7 +1028,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -1019,7 +1028,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp);
taosWriteQitem(tmq->mqueue, pRsp); taosWriteQitem(tmq->mqueue, pRsp);
tsem_post(&tmq->rspSem); /*tsem_post(&tmq->rspSem);*/
} }
END: END:
...@@ -1033,9 +1042,11 @@ END: ...@@ -1033,9 +1042,11 @@ END:
int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) { if (epStatus == 1) {
tscDebug("consumer %ld skip ask ep", tmq->consumerId); int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
return 0; tscDebug("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0;
} }
atomic_store_32(&tmq->epSkipCnt, 0);
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
if (req == NULL) { if (req == NULL) {
...@@ -1221,24 +1232,34 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1221,24 +1232,34 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
tscDebug("consumer %ld skip vg %d", tmq->consumerId, pVg->vgId); int64_t skipCnt = atomic_add_fetch_64(&pVg->skipCnt, 1);
tscDebug("consumer %ld epoch %d skip vg %d skip cnt %ld", tmq->consumerId, tmq->epoch, pVg->vgId, skipCnt);
continue; continue;
#if 0
if (skipCnt < 30000) {
continue;
} else {
tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
}
#endif
} }
atomic_store_64(&pVg->skipCnt, 0);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) { if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem); /*tsem_post(&tmq->rspSem);*/
return -1; return -1;
} }
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) { if (pParam == NULL) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem); /*tsem_post(&tmq->rspSem);*/
return -1; return -1;
} }
pParam->tmq = tmq; pParam->tmq = tmq;
pParam->pVg = pVg; pParam->pVg = pVg;
pParam->vgId = pVg->vgId;
pParam->epoch = tmq->epoch; pParam->epoch = tmq->epoch;
pParam->sync = 0; pParam->sync = 0;
...@@ -1247,7 +1268,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1247,7 +1268,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
taosMemoryFree(pParam); taosMemoryFree(pParam);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem); /*tsem_post(&tmq->rspSem);*/
return -1; return -1;
} }
...@@ -1265,7 +1286,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1265,7 +1286,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1); atomic_add_fetch_32(&tmq->waitingRequest, 1);
tscDebug("consumer %ld send poll: vg %d, req offset %ld", tmq->consumerId, pVg->vgId, pVg->currentOffset); tscDebug("consumer %ld send poll: vg %d, epoch %d, req offset %ld", tmq->consumerId, pVg->vgId, tmq->epoch, pVg->currentOffset);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
...@@ -1379,7 +1400,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1379,7 +1400,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmqAskEp(tmq, false); tmqAskEp(tmq, false);
tmqPollImpl(tmq, blocking_time); tmqPollImpl(tmq, blocking_time);
tsem_wait(&tmq->rspSem); /*tsem_wait(&tmq->rspSem);*/
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspMsg) { if (rspMsg) {
......
...@@ -160,10 +160,8 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { ...@@ -160,10 +160,8 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId); mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
pOldConsumer->epoch++;
// TODO handle update
/*taosWLockLatch(&pOldConsumer->lock);*/ /*taosWLockLatch(&pOldConsumer->lock);*/
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
/*taosWUnLockLatch(&pOldConsumer->lock);*/ /*taosWUnLockLatch(&pOldConsumer->lock);*/
return 0; return 0;
......
...@@ -471,6 +471,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -471,6 +471,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
consumerEp.consumerId = -1; consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epSet; consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId; consumerEp.vgId = plan->execNode.nodeId;
mDebug("init subscribption %s, assign vg: %d", pSub->key, consumerEp.vgId);
int32_t msgLen; int32_t msgLen;
if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) { if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
......
...@@ -212,19 +212,24 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { ...@@ -212,19 +212,24 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1; return -1;
} }
//TODO add lock
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
int32_t serverEpoch = pConsumer->epoch;
// TODO // TODO
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
mTrace("try to get sub ep, old val: %d", hbStatus); mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, hbStatus);
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWrite(pMnode->pSdb, pConsumerRaw);*/ /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/
strcpy(rsp.cgroup, pReq->cgroup); strcpy(rsp.cgroup, pReq->cgroup);
if (epoch != pConsumer->epoch) { if (epoch != serverEpoch) {
mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch); mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, serverEpoch);
mDebug("consumer %ld try r lock", consumerId);
taosRLockLatch(&pConsumer->lock);
mDebug("consumer %ld r locked", consumerId);
SArray *pTopics = pConsumer->currentTopics; SArray *pTopics = pConsumer->currentTopics;
int32_t sz = taosArrayGetSize(pTopics); int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
...@@ -238,7 +243,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { ...@@ -238,7 +243,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) { if (consumerId == pSubConsumer->consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
mInfo("topic %s has %d vg", topicName, pConsumer->epoch); mInfo("topic %s has %d vg", topicName, serverEpoch);
SMqSubTopicEp topicEp; SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName); strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
...@@ -264,6 +269,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { ...@@ -264,6 +269,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
} }
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
taosRUnLockLatch(&pConsumer->lock);
mDebug("consumer %ld r unlock", consumerId);
} }
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp); int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
...@@ -272,7 +279,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { ...@@ -272,7 +279,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
return -1; return -1;
} }
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = pConsumer->epoch; ((SMqRspHead *)buf)->epoch = serverEpoch;
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
...@@ -395,7 +402,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -395,7 +402,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
taosMemoryFreeClear(pRebSub->key); taosMemoryFreeClear(pRebSub->key);
mInfo("mq rebalance subscription: %s", pSub->key); mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, (int32_t)taosArrayGetSize(pSub->unassignedVg));
// remove lost consumer // remove lost consumer
for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
...@@ -442,6 +449,9 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -442,6 +449,9 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
} }
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
mDebug("consumer %ld try w lock", pRebConsumer->consumerId);
taosWLockLatch(&pRebConsumer->lock);
mDebug("consumer %ld w locked", pRebConsumer->consumerId);
int32_t status = atomic_load_32(&pRebConsumer->status); int32_t status = atomic_load_32(&pRebConsumer->status);
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
...@@ -462,6 +472,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -462,6 +472,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendCommitlog(pTrans, pConsumerRaw); mndTransAppendCommitlog(pTrans, pConsumerRaw);
} }
taosWUnLockLatch(&pRebConsumer->lock);
mDebug("consumer %ld w unlock", pRebConsumer->consumerId);
mndReleaseConsumer(pMnode, pRebConsumer); mndReleaseConsumer(pMnode, pRebConsumer);
} }
......
...@@ -747,4 +747,4 @@ void taosSetAllDebugFlag(int32_t flag) { ...@@ -747,4 +747,4 @@ void taosSetAllDebugFlag(int32_t flag) {
fsDebugFlag = flag; fsDebugFlag = flag;
uInfo("all debug flag are set to %d", flag); uInfo("all debug flag are set to %d", flag);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册