提交 9ca77572 编写于 作者: H Haojun Liao

fix(tmq): fix race condition.

上级 0d1b2e4b
...@@ -106,17 +106,13 @@ struct tmq_t { ...@@ -106,17 +106,13 @@ struct tmq_t {
tmr_h reportTimer; tmr_h reportTimer;
tmr_h commitTimer; tmr_h commitTimer;
// connection STscObj* pTscObj; // connection
STscObj* pTscObj; SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of rsp
// container STaosQall* qall;
SArray* clientTopics; // SArray<SMqClientTopic> STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
STaosQueue* mqueue; // queue of rsp TdThreadMutex lock; // used to protect the operation on each topic, when updating the epsets.
STaosQall* qall; tsem_t rspSem;
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
// ctl
tsem_t rspSem;
}; };
enum { enum {
...@@ -213,9 +209,9 @@ typedef struct { ...@@ -213,9 +209,9 @@ typedef struct {
typedef struct { typedef struct {
SMqCommitCbParamSet* params; SMqCommitCbParamSet* params;
STqOffset* pOffset; STqOffset* pOffset;
SMqClientVg* pMqVg; char topicName[TSDB_TOPIC_FNAME_LEN];
/*char topicName[TSDB_TOPIC_FNAME_LEN];*/ int32_t vgId;
/*int32_t vgId;*/ tmq_t* pTmq;
} SMqCommitCbParam; } SMqCommitCbParam;
static int32_t tmqAskEp(tmq_t* tmq, bool async); static int32_t tmqAskEp(tmq_t* tmq, bool async);
...@@ -431,6 +427,7 @@ static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) { ...@@ -431,6 +427,7 @@ static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array // push into array
#if 0 #if 0
if (code == 0) { if (code == 0) {
...@@ -440,15 +437,34 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -440,15 +437,34 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
} }
#endif #endif
// there may be race condition. fix it if (pBuf->pEpSet != NULL) {
if (pBuf->pEpSet != NULL && pParam->pMqVg != NULL) { // todo extract method
SMqClientVg* pMqVg = pParam->pMqVg; taosThreadMutexLock(&pParam->pTmq->lock);
int32_t numOfTopics = taosArrayGetSize(pParam->pTmq->clientTopics);
for(int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopic = taosArrayGet(pParam->pTmq->clientTopics, i);
if (strcmp(pTopic->topicName, pParam->topicName) != 0) {
continue;
}
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for(int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (pClientVg->vgId == pParam->vgId) {
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet);
SEp* pOld = GET_ACTIVE_EP(&(pClientVg->epSet));
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pParam->vgId,
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pClientVg->epSet = *pBuf->pEpSet;
break;
}
}
break;
}
SEp* pEp = GET_ACTIVE_EP(pBuf->pEpSet); taosThreadMutexUnlock(&pParam->pTmq->lock);
SEp* pOld = GET_ACTIVE_EP(&(pMqVg->epSet));
uDebug("subKey:%s update the epset vgId:%d, ep:%s:%d, old ep:%s:%d", pParam->pOffset->subKey, pMqVg->vgId,
pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
pParam->pMqVg->epSet = *pBuf->pEpSet;
} }
taosMemoryFree(pParam->pOffset); taosMemoryFree(pParam->pOffset);
...@@ -508,7 +524,10 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -508,7 +524,10 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pParam->params = pParamSet; pParam->params = pParamSet;
pParam->pOffset = pOffset; pParam->pOffset = pOffset;
pParam->pMqVg = pVg; // there may be an race condition pParam->vgId = pVg->vgId;
pParam->pTmq = tmq;
tstrncpy(pParam->topicName, pTopic->topicName, tListLen(pParam->topicName));
// build send info // build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
...@@ -659,6 +678,7 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, ...@@ -659,6 +678,7 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
for (int32_t i = 0; i < numOfTopics; i++) { for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
// todo race condition: fix it
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) { for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
...@@ -944,10 +964,14 @@ void tmqFreeImpl(void* handle) { ...@@ -944,10 +964,14 @@ void tmqFreeImpl(void* handle) {
tmqClearUnhandleMsg(tmq); tmqClearUnhandleMsg(tmq);
taosCloseQueue(tmq->mqueue); taosCloseQueue(tmq->mqueue);
} }
if (tmq->delayedTask) taosCloseQueue(tmq->delayedTask);
taosFreeQall(tmq->qall);
if (tmq->delayedTask) {
taosCloseQueue(tmq->delayedTask);
}
taosFreeQall(tmq->qall);
tsem_destroy(&tmq->rspSem); tsem_destroy(&tmq->rspSem);
taosThreadMutexDestroy(&tmq->lock);
int32_t sz = taosArrayGetSize(tmq->clientTopics); int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -955,6 +979,7 @@ void tmqFreeImpl(void* handle) { ...@@ -955,6 +979,7 @@ void tmqFreeImpl(void* handle) {
taosMemoryFreeClear(pTopic->schema.pSchema); taosMemoryFreeClear(pTopic->schema.pSchema);
taosArrayDestroy(pTopic->vgs); taosArrayDestroy(pTopic->vgs);
} }
taosArrayDestroy(tmq->clientTopics); taosArrayDestroy(tmq->clientTopics);
taos_close_internal(tmq->pTscObj); taos_close_internal(tmq->pTscObj);
taosMemoryFree(tmq); taosMemoryFree(tmq);
...@@ -993,9 +1018,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -993,9 +1018,10 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
pTmq->mqueue = taosOpenQueue(); pTmq->mqueue = taosOpenQueue();
pTmq->qall = taosAllocateQall();
pTmq->delayedTask = taosOpenQueue(); pTmq->delayedTask = taosOpenQueue();
pTmq->qall = taosAllocateQall();
taosThreadMutexInit(&pTmq->lock, NULL);
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL || if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
conf->groupId[0] == 0) { conf->groupId[0] == 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1193,7 +1219,6 @@ FAIL: ...@@ -1193,7 +1219,6 @@ FAIL:
} }
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
//
conf->commitCb = cb; conf->commitCb = cb;
conf->commitCbUserParam = param; conf->commitCbUserParam = param;
} }
...@@ -1328,7 +1353,53 @@ CREATE_MSG_FAIL: ...@@ -1328,7 +1353,53 @@ CREATE_MSG_FAIL:
return -1; return -1;
} }
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
tmq_t* tmq) {
pTopic->schema = pTopicEp->schema;
pTopicEp->schema.nCols = 0;
pTopicEp->schema.pSchema = NULL;
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
sprintf(vgKey, "%s:%d", pTopic->topicName, pVgEp->vgId);
STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
if (pOffset != NULL) {
offsetNew = *pOffset;
}
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = offsetNew,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
};
taosArrayPush(pTopic->vgs, &clientVg);
}
}
static void freeClientVgInfo(void* param) {
SMqClientTopic* pTopic = param;
if (pTopic->schema.nCols) {
taosMemoryFreeClear(pTopic->schema.pSchema);
}
taosArrayDestroy(pTopic->vgs);
}
static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false; bool set = false;
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
...@@ -1343,12 +1414,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1343,12 +1414,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
return false; return false;
} }
SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
if (pHash == NULL) { if (pVgOffsetHashMap == NULL) {
taosArrayDestroy(newTopics); taosArrayDestroy(newTopics);
return false; return false;
} }
// todo extract method
for (int32_t i = 0; i < topicNumCur; i++) { for (int32_t i = 0; i < topicNumCur; i++) {
// find old topic // find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
...@@ -1362,7 +1434,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1362,7 +1434,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
tFormatOffset(buf, 80, &pVgCur->currentOffset); tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf); pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal)); taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
} }
} }
} }
...@@ -1370,62 +1442,25 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1370,62 +1442,25 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
for (int32_t i = 0; i < topicNumGet; i++) { for (int32_t i = 0; i < topicNumGet; i++) {
SMqClientTopic topic = {0}; SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
topic.schema = pTopicEp->schema; initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
pTopicEp->schema.nCols = 0;
pTopicEp->schema.pSchema = NULL;
tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName);
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
if (pOffset != NULL) {
offsetNew = *pOffset;
}
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = offsetNew,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
};
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(newTopics, &topic); taosArrayPush(newTopics, &topic);
} }
// destroy current buffered existed topics info // destroy current buffered existed topics info
if (tmq->clientTopics) { if (tmq->clientTopics) {
int32_t sz = taosArrayGetSize(tmq->clientTopics); taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
taosArrayDestroy(pTopic->vgs);
}
taosArrayDestroy(tmq->clientTopics);
} }
taosHashCleanup(pHash); taosHashCleanup(pVgOffsetHashMap);
tmq->clientTopics = newTopics;
if (taosArrayGetSize(tmq->clientTopics) == 0) { taosThreadMutexLock(&tmq->lock);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); tmq->clientTopics = newTopics;
} else { taosThreadMutexUnlock(&tmq->lock);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
atomic_store_8(&tmq->status, flag);
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
return set; return set;
} }
...@@ -1448,7 +1483,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1448,7 +1483,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
pParam->code = code; pParam->code = code;
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async, tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async,
tstrerror(code)); tstrerror(code));
goto END; goto END;
...@@ -1471,8 +1506,6 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1471,8 +1506,6 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (!async) { if (!async) {
SMqAskEpRsp rsp; SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
/*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/
tmqUpdateEp(tmq, head->epoch, &rsp); tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp); tDeleteSMqAskEpRsp(&rsp);
} else { } else {
...@@ -1493,7 +1526,6 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1493,7 +1526,6 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
END: END:
/*atomic_store_8(&tmq->epStatus, 0);*/
if (!async) { if (!async) {
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
} else { } else {
...@@ -1545,7 +1577,6 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1545,7 +1577,6 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
if (pParam == NULL) { if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId); tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
taosMemoryFree(pReq); taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1; return -1;
} }
...@@ -1559,7 +1590,6 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1559,7 +1590,6 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
taosMemoryFree(pReq); taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1; return -1;
} }
...@@ -1995,6 +2025,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { ...@@ -1995,6 +2025,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
tmq_list_destroy(lst); tmq_list_destroy(lst);
} }
taosRemoveRef(tmqMgmt.rsetId, tmq->refId); taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册