提交 c7e6883a 编写于 作者: wmmhello's avatar wmmhello

fix:tmq ref err & vg error

上级 54f48601
......@@ -87,6 +87,7 @@ struct tmq_t {
void* commitCbUserParam;
// status
SRWLatch lock;
int8_t status;
int32_t epoch;
#if 0
......@@ -156,6 +157,7 @@ typedef struct {
int8_t tmqRspType;
int32_t epoch; // epoch can be used to guard the vgHandle
int32_t vgId;
char topicName[TSDB_TOPIC_FNAME_LEN];
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
uint64_t reqId;
......@@ -168,8 +170,8 @@ typedef struct {
} SMqPollRspWrapper;
typedef struct {
int64_t refId;
int32_t epoch;
// int64_t refId;
// int32_t epoch;
tsem_t rspSem;
int32_t rspErr;
} SMqSubscribeCbParam;
......@@ -184,8 +186,9 @@ typedef struct {
typedef struct {
int64_t refId;
int32_t epoch;
SMqClientVg* pVg;
SMqClientTopic* pTopic;
char topicName[TSDB_TOPIC_FNAME_LEN];
// SMqClientVg* pVg;
// SMqClientTopic* pTopic;
int32_t vgId;
uint64_t requestId; // request id for debug purpose
} SMqPollCbParam;
......@@ -709,8 +712,8 @@ static void generateTimedTask(int64_t refId, int32_t type) {
*pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
taosReleaseRef(tmqMgmt.rsetId, refId);
}
void tmqAssignAskEpTask(void* param, void* tmrId) {
......@@ -1037,6 +1040,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->commitCb = conf->commitCb;
pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset;
taosInitRWLatch(&pTmq->lock);
pTmq->hbBgEnable = conf->hbBgEnable;
......@@ -1140,8 +1144,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SMqSubscribeCbParam param = {
.rspErr = 0,
.refId = tmq->refId,
.epoch = tmq->epoch,
// .refId = tmq->refId,
// .epoch = tmq->epoch,
};
if (tsem_init(&param.rspSem, 0, 0) != 0) {
......@@ -1217,12 +1221,40 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
conf->commitCbUserParam = param;
}
static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId){
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for(int i = 0; i < topicNumCur; i++){
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if(strcmp(pTopicCur->topicName, topicName) == 0){
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
if(pVgCur->vgId == vgId){
return pVgCur;
}
}
}
}
return NULL;
}
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for(int i = 0; i < topicNumCur; i++){
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if(strcmp(pTopicCur->topicName, topicName) == 0){
return pTopicCur;
}
}
return NULL;
}
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
int64_t refId = pParam->refId;
SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic;
// SMqClientVg* pVg = pParam->pVg;
// SMqClientTopic* pTopic = pParam->pTopic;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
......@@ -1303,11 +1335,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
pRspWrapper->tmqRspType = rspType;
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
// pRspWrapper->vgHandle = pVg;
// pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
pRspWrapper->vgId = pVg->vgId;
pRspWrapper->vgId = vgId;
strcpy(pRspWrapper->topicName, pParam->topicName);
pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
......@@ -1351,7 +1384,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
CREATE_MSG_FAIL:
if (epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
if(pVg){
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosWLockLatch(&tmq->lock);
}
tsem_post(&tmq->rspSem);
......@@ -1472,11 +1510,13 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
taosHashCleanup(pVgOffsetHashMap);
taosWLockLatch(&tmq->lock);
// destroy current buffered existed topics info
if (tmq->clientTopics) {
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
}
tmq->clientTopics = newTopics;
taosWUnLockLatch(&tmq->lock);
int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
atomic_store_8(&tmq->status, flag);
......@@ -1492,7 +1532,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
if (tmq == NULL) {
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
// pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
......@@ -1652,8 +1692,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
pParam->pVg = pVg; // pVg may be released,fix it
pParam->pTopic = pTopic;
// pParam->pVg = pVg; // pVg may be released,fix it
// pParam->pTopic = pTopic;
strcpy(pParam->topicName, pTopic->topicName);
pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId;
......@@ -1779,8 +1820,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
return NULL;
}
// update the epset
if (pollRspWrapper->pEpset != NULL) {
SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
......@@ -1829,7 +1876,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
return NULL;
}
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
}
......@@ -1849,7 +1903,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
return NULL;
}
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册