提交 1e01f902 编写于 作者: H Haojun Liao

refactor(tmq): do some internal refactor.

上级 9d680a09
......@@ -214,6 +214,8 @@ typedef struct {
} SMqCommitCbParam;
static int32_t tmqAskEp(tmq_t* tmq, bool async);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
......@@ -380,42 +382,6 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
return container->pData;
}
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
if (tmq == NULL) {
if (!pParamSet->async) {
tsem_destroy(&pParamSet->rspSem);
}
taosMemoryFree(pParamSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
}
// if no more waiting rsp
if (pParamSet->async) {
// call async cb func
if (pParamSet->automatic && tmq->commitCb) {
tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) {
// sem post
pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
}
taosMemoryFree(pParamSet);
} else {
tsem_post(&pParamSet->rspSem);
}
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
return 0;
}
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
if (waitingRspNum == 0) {
......@@ -728,8 +694,8 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
return code;
}
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
if (msg) {
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else {
......@@ -1550,90 +1516,6 @@ END:
return code;
}
int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t code = TSDB_CODE_SUCCESS;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0;
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
SMqAskEpReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
strcpy(req.cgroup, tmq->groupId);
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
return -1;
}
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
taosMemoryFree(pReq);
return -1;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
taosMemoryFree(pReq);
return -1;
}
pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch;
pParam->async = async;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
return -1;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = tlen,
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
sendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
code = pParam->code;
taosMemoryFree(pParam);
}
return code;
}
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int32_t groupLen = strlen(tmq->groupId);
memcpy(pReq->subKey, tmq->groupId, groupLen);
......@@ -2136,3 +2018,123 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void*
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
}
int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t code = TSDB_CODE_SUCCESS;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0;
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
SMqAskEpReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
strcpy(req.cgroup, tmq->groupId);
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
return -1;
}
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
taosMemoryFree(pReq);
return -1;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
taosMemoryFree(pReq);
return -1;
}
pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch;
pParam->async = async;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
return -1;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = tlen,
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
sendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
code = pParam->code;
taosMemoryFree(pParam);
}
return code;
}
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParamSet->refId);
if (tmq == NULL) {
if (!pParamSet->async) {
tsem_destroy(&pParamSet->rspSem);
}
taosMemoryFree(pParamSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
}
// if no more waiting rsp
if (pParamSet->async) {
// call async cb func
if (pParamSet->automatic && tmq->commitCb) {
tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) {
// sem post
pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
}
taosMemoryFree(pParamSet);
} else {
tsem_post(&pParamSet->rspSem);
}
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册