提交 e10ec841 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 fd8cad28
...@@ -458,6 +458,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -458,6 +458,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
pOffset->val = pVg->currentOffset; pOffset->val = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId); int32_t groupLen = strlen(tmq->groupId);
...@@ -471,11 +472,13 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -471,11 +472,13 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) { if (buf == NULL) {
taosMemoryFree(pOffset); taosMemoryFree(pOffset);
return -1; return -1;
} }
((SMsgHead*)buf)->vgId = htonl(pVg->vgId); ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
...@@ -492,6 +495,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -492,6 +495,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree(buf); taosMemoryFree(buf);
return -1; return -1;
} }
pParam->params = pParamSet; pParam->params = pParamSet;
pParam->pOffset = pOffset; pParam->pOffset = pOffset;
...@@ -503,14 +507,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -503,14 +507,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
taosMemoryFree(pParam); taosMemoryFree(pParam);
return -1; return -1;
} }
pMsgSendInfo->msgInfo = (SDataBuf){ pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf, .pData = buf,
.len = sizeof(SMsgHead) + len, .len = sizeof(SMsgHead) + len,
.handle = NULL, .handle = NULL,
}; };
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64, tmq->consumerId, pOffset->subKey, SEp* pEp = &pVg->epSet.eps[pVg->epSet.inUse];
pVg->vgId, pOffset->val.version); tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64" prev:%"PRId64", ep:%s:%d", tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
// TODO: put into cb // TODO: put into cb
pVg->committedOffset = pVg->currentOffset; pVg->committedOffset = pVg->currentOffset;
...@@ -637,15 +643,16 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, ...@@ -637,15 +643,16 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
// init as 1 to prevent concurrency issue // init as 1 to prevent concurrency issue
pParamSet->waitingRspNum = 1; pParamSet->waitingRspNum = 1;
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%"PRIx64" start to commit offset for %d topics", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) { for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, current %" PRId64 ", committed %" PRId64, tmq->consumerId,
pTopic->topicName, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue; continue;
} }
...@@ -1085,7 +1092,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1085,7 +1092,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
tNameExtractFullName(&name, topicFName); tNameExtractFullName(&name, topicFName);
tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName); tscDebug("consumer:0x%"PRIx64" subscribe topic: %s", tmq->consumerId, topicFName);
taosArrayPush(req.topicNames, &topicFName); taosArrayPush(req.topicNames, &topicFName);
} }
...@@ -1398,7 +1405,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1398,7 +1405,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
} }
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
return set; return set;
} }
...@@ -1548,7 +1555,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1548,7 +1555,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d", tmq->consumerId, async); tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%"PRIx64, tmq->consumerId, async, tmq->consumerId);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
...@@ -1759,6 +1766,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1759,6 +1766,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while (1) { while (1) {
SMqRspWrapper* rspWrapper = NULL; SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) { if (rspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&rspWrapper); taosGetQitem(tmq->qall, (void**)&rspWrapper);
...@@ -1881,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1881,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed // in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 ", poll return since consumer is init", tmq->consumerId);
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册