From 963fb9b9f61040d3e8ac64b3070fdc4475e9d428 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 27 Jan 2022 18:05:23 +0800 Subject: [PATCH] fix mq rebalance --- source/client/src/clientImpl.c | 10 ++++++++-- source/dnode/mnode/impl/src/mndSubscribe.c | 21 ++++++++++++++++----- source/dnode/vnode/inc/vnode.h | 14 ++++++++++++++ source/dnode/vnode/src/tq/tq.c | 8 +++++++- 4 files changed, 45 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 56f89e30c4..227377c4a4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -628,6 +628,10 @@ struct tmq_message_t { }; int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { + if (code == -1) { + printf("discard\n"); + return 0; + } SMqClientVg* pVg = (SMqClientVg*)param; SMqConsumeRsp rsp; tDecodeSMqConsumeRsp(pMsg->pData, &rsp); @@ -680,6 +684,8 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; if (code != 0) { + + printf("exit wait %d\n", pParam->wait); if (pParam->wait) { tsem_post(&tmq->rspSem); } @@ -691,9 +697,9 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); int32_t sz = taosArrayGetSize(rsp.topics); // TODO: lock + printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size); + printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size); if (rsp.epoch != tmq->epoch) { - /*printf("rsp epoch %ld", rsp.epoch);*/ - /*printf("tmq epoch %ld", tmq->epoch);*/ //TODO if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 80ff6fd8bf..82173833ab 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -104,9 +104,6 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { if (found == 0) { taosArrayPush(pSub->availConsumer, &consumerId); } - SSdbRaw* pRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pRaw, SDB_STATUS_READY); - sdbWriteNotFree(pMnode->pSdb, pRaw); int32_t assignedSz = taosArrayGetSize(pSub->assigned); topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); @@ -126,7 +123,9 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { taosArrayPush(rsp.topics, &topicEp); } if (changed || found) { - + SSdbRaw* pRaw = mndSubActionEncode(pSub); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + sdbWriteNotFree(pMnode->pSdb, pRaw); } mndReleaseSubscribe(pMnode, pSub); } @@ -183,12 +182,15 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } // TODO: acquire consumer, set status to unavail } +#if 0 SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId); pConsumer->epoch++; + printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size); SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer); sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbWriteNotFree(pMnode->pSdb, pRaw); mndReleaseConsumer(pMnode, pConsumer); +#endif } } if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0 && taosArrayGetSize(pSub->availConsumer) > 0) { @@ -207,6 +209,13 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { taosArrayPush(pSub->assigned, pCEp); pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); + SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId); + pConsumer->epoch++; + /*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ + /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ + /*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/ + mndReleaseConsumer(pMnode, pConsumer); + // build msg SMqSetCVgReq req = {0}; @@ -216,8 +225,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { req.logicalPlan = pTopic->logicalPlan; req.physicalPlan = pTopic->physicalPlan; req.qmsg = pCEp->qmsg; + req.newConsumerId = consumerId; int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); - void *buf = malloc(tlen); + void *buf = malloc(sizeof(SMsgHead) + tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -262,6 +272,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas SArray *pArray; SArray *inner = taosArrayGet(pDag->pSubplans, 0); SSubplan *plan = taosArrayGetP(inner, 0); + plan->execNode.inUse = 0; strcpy(plan->execNode.epAddr[0].fqdn, "localhost"); plan->execNode.epAddr[0].port = 6030; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c8b47bf4a6..6fd7a99f54 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -71,6 +71,7 @@ typedef struct { typedef struct STqReadHandle { int64_t ver; uint64_t tbUid; + SHashObj* tbIdHash; SSubmitMsg* pMsg; SSubmitBlk* pBlock; SSubmitMsgIter msgIter; @@ -211,6 +212,19 @@ static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t t pHandle->tbUid = tbUid; } +static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) { + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + return -1; + } + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + //pHandle->tbUid = tbUid; + } + return 0; +} + void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle* pHandle); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1ecd76e1b5..aaa04d6259 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -679,7 +679,13 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - ASSERT(pConsumer); + if (pConsumer == NULL) { + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = -1; + rpcSendResponse(pMsg); + return 0; + } int sz = taosArrayGetSize(pConsumer->topics); for (int i = 0; i < sz; i++) { -- GitLab