提交 66899bd8 编写于 作者: L Liu Jicong

fix(tmq): auto unsubscribe when consumer close

上级 77757f3e
......@@ -196,8 +196,9 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
if (cnt >= 2) break;
/*printf("get data\n");*/
/*msg_process(tmqmessage);*/
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
......@@ -253,39 +254,6 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf(stderr, "%% Consumer closed\n");
}
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t batchCnt = 0;
int32_t skipLogNum = 0;
clock_t startTime = clock();
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/
taos_free_result(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
(double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
if (argc > 1) {
printf("env init\n");
......@@ -296,7 +264,6 @@ int main(int argc, char* argv[]) {
}
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop(tmq, topic_list);
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -230,7 +230,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
......
......@@ -2412,7 +2412,7 @@ typedef struct {
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t waitTime;
int64_t timeout;
int64_t currentOffset;
} SMqPollReq;
......
......@@ -1243,7 +1243,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return TMQ_RESP_ERR__FAIL;
}
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset;
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
......@@ -1269,7 +1269,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
pReq->waitTime = waitTime;
pReq->timeout = timeout;
pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch;
pReq->currentOffset = reqOffset;
......@@ -1297,7 +1297,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("call poll\n");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
......@@ -1318,7 +1318,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
......@@ -1388,7 +1388,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return 0;
}
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while (1) {
SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
......@@ -1428,17 +1428,17 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
taosFreeQitem(rspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer %ld reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, waitTime);
tmqPollImpl(tmq, timeout);
}
}
}
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs();
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
......@@ -1450,16 +1450,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while (1) {
tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
if (tmqPollImpl(tmq, timeout) < 0) return NULL;
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
if (wait_time != 0) {
if (timeout != 0) {
int64_t endTime = taosGetTimestampMs();
int64_t leftTime = endTime - startTime;
if (leftTime > wait_time) {
if (leftTime > timeout) {
tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
return NULL;
}
......@@ -1474,10 +1474,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
}
......@@ -1485,10 +1482,7 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
}
}
......
......@@ -369,7 +369,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
......@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
} else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg);
} else {
mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
......@@ -686,4 +686,4 @@ void mndReleaseSyncRef(SMnode *pMnode) {
int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1);
mTrace("mnode sync is released, ref:%d", ref);
taosThreadRwlockUnlock(&pMnode->lock);
}
\ No newline at end of file
}
......@@ -66,12 +66,12 @@ struct STqReadHandle {
// tqPush
typedef struct {
int64_t consumerId;
int32_t epoch;
int32_t skipLogNum;
int64_t reqOffset;
SRWLatch lock;
SRpcMsg* handle;
int64_t consumerId;
int32_t epoch;
int32_t skipLogNum;
int64_t reqOffset;
SRpcHandleInfo info;
SRWLatch lock;
} STqPushHandle;
#if 0
......
......@@ -180,40 +180,6 @@ void tqClose(STQ* pTq) {
// TODO
}
#if 0
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
/*if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;*/
/*if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;*/
/*if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
/*if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;*/
/*if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;*/
/*if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
#endif
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
......@@ -290,8 +256,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
taosWLockLatch(&pHandle->pushHandle.lock);
SRpcMsg* pMsg = atomic_load_ptr(&pHandle->pushHandle.handle);
ASSERT(pMsg);
/*SRpcHandleInfo* pInfo = atomic_load_ptr(&pHandle->pushHandle.pInfo);*/
/*ASSERT(pInfo);*/
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pHandle->pushHandle.reqOffset;
......@@ -318,7 +284,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
// todo free
return -1;
}
......@@ -329,10 +295,16 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
SRpcMsg resp = {
.info = pHandle->pushHandle.info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
atomic_store_ptr(&pHandle->pushHandle.handle, NULL);
/*atomic_store_ptr(&pHandle->pushHandle.pInfo, NULL);*/
memset(&pHandle->pushHandle.info, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
......@@ -374,7 +346,7 @@ int tqCommit(STQ* pTq) {
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t waitTime = pReq->waitTime;
int64_t waitTime = pReq->timeout;
int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset;
......@@ -410,24 +382,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
rsp.withTbName = pReq->withTbName;
if (rsp.withTbName) {
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
rsp.withSchema = false;
rsp.withTag = false;
} else {
rsp.withSchema = true;
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.withTag = false;
}
/*int8_t withTbName = pExec->withTbName;*/
/*if (pReq->withTbName != -1) {*/
/*withTbName = pReq->withTbName;*/
/*}*/
/*rsp.withTbName = withTbName;*/
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
......@@ -443,15 +413,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SWalReadHead* pHead = &pHeadWithCkSum->head;
#if 0
SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
#if 0
// add to pushMgr
taosWLockLatch(&pExec->pushHandle.lock);
......@@ -473,10 +434,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
#endif
break;
}
#endif
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
......@@ -533,8 +490,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO wrap in destroy func
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
if (rsp.withSchema) {
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (rsp.withTbName) {
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
}
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册