提交 bca1af3f 编写于 作者: wmmhello's avatar wmmhello

fix:change tmq subscribe try time to 60mins

上级 50bdfef7
......@@ -1089,7 +1089,7 @@ _failed:
}
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const int32_t MAX_RETRY_COUNT = 120 * 60; // let's wait for 2 mins at most
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
void* buf = NULL;
......@@ -1186,7 +1186,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL;
}
......
......@@ -73,8 +73,11 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
STqHandle *pHandle = (STqHandle*)handle;
int32_t vgId = TD_VID(pTq->pVnode);
if(taosHashGetSize(pTq->pPushMgr) <= 0) {
return 0;
}
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) {
tqPushDataRsp(pTq, pHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册