提交 7510a0e7 编写于 作者: L Liu Jicong

remove createRequest in poll

上级 01cac6b0
...@@ -202,6 +202,26 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) { ...@@ -202,6 +202,26 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
return 0; return 0;
} }
void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg;
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
}
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code; pParam->rspErr = code;
...@@ -845,6 +865,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { ...@@ -845,6 +865,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
if (pVg->vgId == pOffset->vgId) { if (pVg->vgId == pOffset->vgId) {
pVg->currentOffset = pOffset->offset; pVg->currentOffset = pOffset->offset;
tmqClearUnhandleMsg(tmq);
return TMQ_RESP_ERR__SUCCESS; return TMQ_RESP_ERR__SUCCESS;
} }
} }
...@@ -883,26 +904,6 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien ...@@ -883,26 +904,6 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien
return pReq; return pReq;
} }
void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg;
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
taosFreeQitem(msg);
else
break;
}
}
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL; tmq_message_t* msg = NULL;
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
...@@ -919,29 +920,35 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -919,29 +920,35 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
// TODO: out of mem // TODO: out of mem
return NULL; return NULL;
} }
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam));
if (param == NULL) { SMqPollCbParam* pParam = malloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem // TODO: out of mem
return NULL; return NULL;
} }
param->tmq = tmq; pParam->tmq = tmq;
param->pVg = pVg; pParam->pVg = pVg;
param->epoch = tmq->epoch; pParam->epoch = tmq->epoch;
param->sync = 1; pParam->sync = 1;
param->msg = &msg; pParam->msg = &msg;
tsem_init(&param->rspSem, 0, 0); tsem_init(&pParam->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){ SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
return NULL;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq, .pData = pReq,
.len = sizeof(SMqConsumeReq), .len = sizeof(SMqConsumeReq),
.handle = NULL, .handle = NULL,
}; };
sendInfo->requestId = generateRequestId();
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = param; sendInfo->param = pParam;
sendInfo->fp = tmqPollCb; sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_CONSUME;
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
...@@ -950,7 +957,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -950,7 +957,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
pVg->pollCnt++; pVg->pollCnt++;
tmq->pollCnt++; tmq->pollCnt++;
tsem_wait(&param->rspSem); tsem_wait(&pParam->rspSem);
tmq_message_t* nmsg = NULL; tmq_message_t* nmsg = NULL;
while (1) { while (1) {
taosReadQitem(tmq->mqueue, (void**)&nmsg); taosReadQitem(tmq->mqueue, (void**)&nmsg);
...@@ -978,32 +985,40 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -978,32 +985,40 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) { if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
return -1; return -1;
} }
SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); SMqPollCbParam* pParam = malloc(sizeof(SMqPollCbParam));
if (param == NULL) { if (pParam == NULL) {
free(pReq);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
return -1; return -1;
} }
param->tmq = tmq; pParam->tmq = tmq;
param->pVg = pVg; pParam->pVg = pVg;
param->epoch = tmq->epoch; pParam->epoch = tmq->epoch;
param->sync = 0; pParam->sync = 0;
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){ SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
free(pReq);
free(pParam);
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
return -1;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq, .pData = pReq,
.len = sizeof(SMqConsumeReq), .len = sizeof(SMqConsumeReq),
.handle = NULL, .handle = NULL,
}; };
sendInfo->requestId = generateRequestId();
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = param; sendInfo->param = pParam;
sendInfo->fp = tmqPollCb; sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_CONSUME;
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
...@@ -1053,7 +1068,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese ...@@ -1053,7 +1068,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
return rspMsg; return rspMsg;
} else { } else {
printf("epoch mismatch\n"); /*printf("epoch mismatch\n");*/
taosFreeQitem(rspMsg); taosFreeQitem(rspMsg);
} }
} else { } else {
...@@ -1107,9 +1122,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1107,9 +1122,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
while (1) { while (1) {
/*printf("cycle\n");*/ /*printf("cycle\n");*/
if (atomic_load_32(&tmq->waitingRequest) == 0) { tmqPollImpl(tmq, blocking_time);
tmqPollImpl(tmq, blocking_time);
}
tsem_wait(&tmq->rspSem); tsem_wait(&tmq->rspSem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册