未验证 提交 caec1313 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20628 from taosdata/fix/liaohj

fix(tmq): wait for 2mins when subscribe topics.
...@@ -1116,6 +1116,7 @@ _failed: ...@@ -1116,6 +1116,7 @@ _failed:
} }
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
...@@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1209,7 +1210,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0; int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 40) { if (retryCnt++ > MAX_RETRY_COUNT) {
goto FAIL; goto FAIL;
} }
...@@ -1811,7 +1812,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1811,7 +1812,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
taosReadAllQitems(tmq->mqueue, tmq->qall); taosReadAllQitems(tmq->mqueue, tmq->qall);
taosGetQitem(tmq->qall, (void**)&pRspWrapper); taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
return NULL; return NULL;
} }
...@@ -1831,7 +1831,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1831,7 +1831,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) { if (pDataRsp->head.epoch == consumerEpoch) {
// todo fix it: race condition
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
// update the epset // update the epset
...@@ -1843,6 +1842,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1843,6 +1842,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
// update the local offset value only for the returned values.
pVg->currentOffset = pDataRsp->rspOffset; pVg->currentOffset = pDataRsp->rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
......
...@@ -109,23 +109,18 @@ typedef struct { ...@@ -109,23 +109,18 @@ typedef struct {
} STqPushEntry; } STqPushEntry;
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
int64_t walLogLastVer; int64_t walLogLastVer;
SRWLatch lock;
SRWLatch pushLock; SHashObj* pPushMgr; // consumerId -> STqPushEntry
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pPushMgr; // consumerId -> STqPushEntry SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
TDB* pMetaDB;
TDB* pMetaDB; TTB* pExecStore;
TTB* pExecStore; TTB* pCheckStore;
TTB* pCheckStore; SStreamMeta* pStreamMeta;
SStreamMeta* pStreamMeta;
}; };
typedef struct { typedef struct {
...@@ -164,7 +159,7 @@ typedef struct { ...@@ -164,7 +159,7 @@ typedef struct {
int32_t size; int32_t size;
} STqOffsetHead; } STqOffsetHead;
STqOffsetStore* tqOffsetOpen(); STqOffsetStore* tqOffsetOpen(STQ* pTq);
void tqOffsetClose(STqOffsetStore*); void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
......
此差异已折叠。
...@@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost // lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->lock);
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) { if (numOfRegisteredPush > 0) {
...@@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosArrayDestroy(cachedKeyLens); taosArrayDestroy(cachedKeyLens);
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
return -1; return -1;
} }
...@@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
taosMemoryFree(data); taosMemoryFree(data);
} }
// unlock // unlock
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->lock);
} }
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) { if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册