提交 8f6ac6d7 编写于 作者: H Haojun Liao

fix(tmq): wait for 2mins when subscribe topics.

上级 69ca2b2f
...@@ -1116,6 +1116,7 @@ _failed: ...@@ -1116,6 +1116,7 @@ _failed:
} }
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
...@@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0; int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 40) { if (retryCnt++ > MAX_RETRY_COUNT) {
goto FAIL; goto FAIL;
} }
......
...@@ -159,7 +159,7 @@ typedef struct { ...@@ -159,7 +159,7 @@ typedef struct {
int32_t size; int32_t size;
} STqOffsetHead; } STqOffsetHead;
STqOffsetStore* tqOffsetOpen(); STqOffsetStore* tqOffsetOpen(STQ* pTq);
void tqOffsetClose(STqOffsetStore*); void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册