diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 82da067d8e25d9f82df466733aaa67d35f766551..111ca28cdc4153cde2ee07f54b630efaa312ba84 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1116,6 +1116,7 @@ _failed: } int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { + const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; @@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { - if (retryCnt++ > 40) { + if (retryCnt++ > MAX_RETRY_COUNT) { goto FAIL; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 379ea25ee68e0238da34a8b6d92d66fa99c37ba3..1d7c213e1aff8c59fe1860896269ee34bc14aa25 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -159,7 +159,7 @@ typedef struct { int32_t size; } STqOffsetHead; -STqOffsetStore* tqOffsetOpen(); +STqOffsetStore* tqOffsetOpen(STQ* pTq); void tqOffsetClose(STqOffsetStore*); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);