未验证 提交 db63b436 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21953 from taosdata/mark/tmq

fix:add lock for tmq->clientTopic
......@@ -636,6 +636,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam;
taosRLockLatch(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
......@@ -646,6 +647,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
pTopicName, numOfTopics);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
taosRUnLockLatch(&tmq->lock);
return;
}
......@@ -663,6 +665,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
vgId, numOfVgroups, pTopicName);
taosMemoryFree(pParamSet);
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
taosRUnLockLatch(&tmq->lock);
return;
}
......@@ -679,6 +682,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
}
taosRUnLockLatch(&tmq->lock);
}
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
......@@ -696,6 +700,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
// init as 1 to prevent concurrency issue
pParamSet->waitingRspNum = 1;
taosRLockLatch(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
......@@ -725,6 +730,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
}
}
}
taosRUnLockLatch(&tmq->lock);
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
numOfTopics);
......@@ -799,6 +805,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMqHbReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
taosRLockLatch(&tmq->lock);
// if(tmq->needReportOffsetRows){
req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
......@@ -820,6 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
}
// tmq->needReportOffsetRows = false;
// }
taosRUnLockLatch(&tmq->lock);
int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
if (tlen < 0) {
......@@ -986,10 +994,12 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
if (*topics == NULL) {
*topics = tmq_list_new();
}
taosRLockLatch(&tmq->lock);
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
}
taosRUnLockLatch(&tmq->lock);
return 0;
}
......@@ -1527,12 +1537,7 @@ static void freeClientVgInfo(void* param) {
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false;
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
if (epoch <= tmq->epoch) {
return false;
}
......@@ -1548,6 +1553,12 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
return false;
}
taosWLockLatch(&tmq->lock);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
// todo extract method
for (int32_t i = 0; i < topicNumCur; i++) {
// find old topic
......@@ -1579,7 +1590,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
taosHashCleanup(pVgOffsetHashMap);
taosWLockLatch(&tmq->lock);
// destroy current buffered existed topics info
if (tmq->clientTopics) {
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
......@@ -1807,6 +1817,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
return 0;
}
int32_t code = 0;
taosWLockLatch(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
......@@ -1816,7 +1829,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
continue;
......@@ -1831,15 +1844,17 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
atomic_store_32(&pVg->vgSkipCnt, 0);
int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto end;
}
}
}
tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
return 0;
end:
taosWUnLockLatch(&tmq->lock);
tscDebug("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code);
return code;
}
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
......@@ -1891,12 +1906,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) {
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
taosWUnLockLatch(&tmq->lock);
return NULL;
}
// update the epset
......@@ -1944,8 +1961,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
taosWUnLockLatch(&tmq->lock);
return pRsp;
}
taosWUnLockLatch(&tmq->lock);
} else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
......@@ -1960,12 +1979,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
taosWUnLockLatch(&tmq->lock);
return NULL;
}
......@@ -1977,6 +1998,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
taosWUnLockLatch(&tmq->lock);
return pRsp;
} else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
......@@ -1989,12 +2011,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
taosWUnLockLatch(&tmq->lock);
return NULL;
}
......@@ -2017,32 +2041,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
continue;
} else {
pVg->emptyBlockReceiveTs = 0; // reset the ts
}
// build rsp
void* pRsp = NULL;
int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
}
// build rsp
void* pRsp = NULL;
int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
}
tmq->totalRows += numOfRows;
tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN];
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
return pRsp;
char buf[TSDB_OFFSET_LEN];
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
taosWUnLockLatch(&tmq->lock);
return pRsp;
}
taosWUnLockLatch(&tmq->lock);
} else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
......@@ -2121,7 +2144,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
}
}
static void displayConsumeStatistics(const tmq_t* pTmq) {
static void displayConsumeStatistics(tmq_t* pTmq) {
taosRLockLatch(&pTmq->lock);
int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
......@@ -2137,7 +2161,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) {
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
}
}
taosRUnLockLatch(&pTmq->lock);
tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
......@@ -2544,14 +2568,18 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int32_t* numOfAssignment) {
*numOfAssignment = 0;
*assignment = NULL;
SMqVgCommon* pCommon = NULL;
int32_t accId = tmq->pTscObj->acctId;
char tname[128] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
int32_t code = TSDB_CODE_SUCCESS;
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
goto end;
}
// in case of snapshot is opened, no valid offset will return
......@@ -2561,7 +2589,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
if (*assignment == NULL) {
tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
(*numOfAssignment) * sizeof(tmq_topic_assignment));
return TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
bool needFetch = false;
......@@ -2586,10 +2615,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
if (needFetch) {
SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
if (pCommon == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
code = terrno;
goto end;
}
pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
......@@ -2604,8 +2634,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
if (pParam == NULL) {
destroyCommonInfo(pCommon);
return terrno;
code = terrno;
goto end;
}
pParam->epoch = tmq->epoch;
......@@ -2619,30 +2649,30 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
taosMemoryFree(pParam);
destroyCommonInfo(pCommon);
return terrno;
code = terrno;
goto end;
}
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
taosMemoryFree(pParam);
destroyCommonInfo(pCommon);
return terrno;
code = terrno;
goto end;
}
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
taosMemoryFree(pParam);
destroyCommonInfo(pCommon);
return terrno;
code = terrno;
goto end;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(msg);
destroyCommonInfo(pCommon);
return terrno;
code = terrno;
goto end;
}
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
......@@ -2662,20 +2692,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
}
tsem_wait(&pCommon->rsp);
int32_t code = pCommon->code;
code = pCommon->code;
terrno = code;
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(*assignment);
*assignment = NULL;
*numOfAssignment = 0;
} else {
int32_t num = taosArrayGetSize(pCommon->pList);
for(int32_t i = 0; i < num; ++i) {
(*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
}
*numOfAssignment = num;
goto end;
}
int32_t num = taosArrayGetSize(pCommon->pList);
for(int32_t i = 0; i < num; ++i) {
(*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
}
*numOfAssignment = num;
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
tmq_topic_assignment* p = &(*assignment)[j];
......@@ -2701,12 +2728,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pOffsetInfo->committedOffset.version = p->currentOffset;
}
}
}
destroyCommonInfo(pCommon);
return code;
} else {
return TSDB_CODE_SUCCESS;
end:
if(code != TSDB_CODE_SUCCESS){
taosMemoryFree(*assignment);
*assignment = NULL;
*numOfAssignment = 0;
}
destroyCommonInfo(pCommon);
taosWUnLockLatch(&tmq->lock);
return code;
}
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
......@@ -2727,9 +2759,11 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
char tname[128] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
taosWLockLatch(&tmq->lock);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
}
......@@ -2745,6 +2779,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
}
......@@ -2753,12 +2788,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
int32_t type = pOffsetInfo->currentOffset.type;
if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
}
if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
}
......@@ -2773,6 +2810,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
taosWUnLockLatch(&tmq->lock);
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
if (pInfo == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册