diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7cd8ffb6f3b8f2e9a82f72263bd47d02e6627c9d..48fb958ef0b6532091e2aac696af4a1565d2a9b6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1104,10 +1104,14 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq return buf; } -typedef struct SMqTmrMsg { +typedef struct { int32_t reserved; } SMqTmrMsg; +typedef struct { + int64_t consumerId; +} SMqDoRebalanceMsg; + typedef struct { int64_t status; } SMVSubscribeRsp; @@ -1685,13 +1689,13 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) return buf; } -typedef struct SMqTbData { +typedef struct { int64_t uid; int32_t numOfRows; char* colData; } SMqTbData; -typedef struct SMqTopicBlk { +typedef struct { char topicName[TSDB_TOPIC_FNAME_LEN]; int64_t committedOffset; int64_t reqOffset; @@ -1702,7 +1706,7 @@ typedef struct SMqTopicBlk { SMqTbData* tbData; } SMqTopicData; -typedef struct SMqConsumeRsp { +typedef struct { int64_t consumerId; SSchemaWrapper* schemas; int64_t committedOffset; @@ -1714,7 +1718,7 @@ typedef struct SMqConsumeRsp { } SMqConsumeRsp; // one req for one vg+topic -typedef struct SMqConsumeReq { +typedef struct { SMsgHead head; //0: commit only, current offset //1: consume only, poll next offset @@ -1730,17 +1734,17 @@ typedef struct SMqConsumeReq { char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; -typedef struct SMqSubVgEp { +typedef struct { int32_t vgId; SEpSet epSet; } SMqSubVgEp; -typedef struct SMqSubTopicEp { +typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; SArray* vgs; // SArray } SMqSubTopicEp; -typedef struct SMqCMGetSubEpRsp { +typedef struct { int64_t consumerId; int64_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index e81dd798f668f9f37d73e5d4b4ed66bae08e61d2..f95fe8dd0806185ce898b00ec6515bde4e4be24b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -141,7 +141,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMqTmrMsg, SMqTmrMsg) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index de4b4e4e89d1418391aa6daf519c77bea8013c72..e1306800d3e3c9491bad297130d23ba83932ec6e 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -27,6 +27,7 @@ typedef struct SMnodeMsg SMnodeMsg; typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); +typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef struct SMnodeLoad { @@ -64,6 +65,7 @@ typedef struct { SMnodeCfg cfg; SDnode *pDnode; PutReqToMWriteQFp putReqToMWriteQFp; + PutReqToMReadQFp putReqToMReadQFp; SendReqToDnodeFp sendReqToDnodeFp; SendReqToMnodeFp sendReqToMnodeFp; SendRedirectRspFp sendRedirectRspFp; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index e57901ed2ed8a4fe030c405d93e913fc237cb00c..a820d68e504d98f91d9793eda1f1673ae5cc766f 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -209,6 +209,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SName name = {0}; char* dbName = getDbOfConnection(tmq->pTscObj); + if (dbName == NULL) { + return TMQ_RESP_ERR__FAIL; + } tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); tNameFromString(&name, topicName, T_NAME_TABLE); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1e68faa4f471e0467b0d6d2ba0ad9123a9918c16..9c4da6cd5bcfed3605f47f202ebc0f0f62c32ce1 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -565,7 +565,6 @@ TEST(testCase, insert_test) { #endif -#if 0 TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -585,7 +584,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 100000; ++i) { + for(int32_t i = 0; i < 10000000; ++i) { char sql[512] = {0}; sprintf(sql, "insert into tu values(now+%da, %d)", i, i); TAOS_RES* p = taos_query(pConn, sql); @@ -616,6 +615,7 @@ TEST(testCase, projection_query_tables) { taos_free_result(pRes); taos_close(pConn); } +#if 0 TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/client/test/tmqTest.cpp b/source/client/test/tmqTest.cpp index f767e7faef15d5bfd32230d4d4758a2dd057d580..9f8ff7143a822cfe3abb69b1475f884397501436 100644 --- a/source/client/test/tmqTest.cpp +++ b/source/client/test/tmqTest.cpp @@ -46,7 +46,6 @@ TEST(testCase, create_topic_ctb_Test) { if (taos_errno(pRes) != 0) { printf("error in use db, reason:%s\n", taos_errstr(pRes)); } - //taos_free_result(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes); ASSERT_TRUE(pFields == nullptr); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index f952f69f20d1ef6168e1e93c820cf454626e9b09..20585f269badad0ca02e08d3d17196261c1af05c 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -256,6 +256,12 @@ static bool dndNeedDeployMnode(SDnode *pDnode) { static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); + return 0; +} + +static int32_t dndPutMsgToMReadQ(SDnode *pDnode, SRpcMsg* pRpcMsg) { + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg); + return 0; } static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { @@ -264,6 +270,7 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dndSendRedirectRsp; pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ; + pOption->putReqToMReadQFp = dndPutMsgToMReadQ; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); pOption->cfg.sver = pDnode->env.sver; diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 42b2de01cc4ff38d53b310741c1ccf9677f540ae..e91a9876c53447370d7ae411a6bf94a6e799281c 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -28,6 +28,8 @@ void mndCleanupConsumer(SMnode *pMnode); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); +SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup); + SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e8a9a68466919c0bd77857467e9f859635edb67f..9609deb235e86dd6f186e68858b7af142494e204 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -341,42 +341,22 @@ typedef struct { char payload[]; } SShowObj; -#if 0 -typedef struct SConsumerObj { - uint64_t uid; - int64_t createTime; - int64_t updateTime; - //uint64_t dbUid; - int32_t version; - SRWLatch lock; - SArray* topics; -} SConsumerObj; - -typedef struct SMqTopicConsumer { - int64_t consumerId; - SList* topicList; -} SMqTopicConsumer; -#endif - -typedef struct SMqConsumerEp { - int32_t vgId; // -1 for unassigned - int32_t status; - SEpSet epSet; - int64_t consumerId; // -1 for unassigned - int64_t lastConsumerHbTs; - int64_t lastVgHbTs; - char* qmsg; +typedef struct { + int32_t vgId; // -1 for unassigned + int32_t status; + SEpSet epSet; + int64_t oldConsumerId; + int64_t consumerId; // -1 for unassigned + char* qmsg; } SMqConsumerEp; -static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { +static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pConsumerEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeFixedI32(buf, pConsumerEp->status); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); + tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); - tlen += taosEncodeFixedI64(buf, pConsumerEp->lastConsumerHbTs); - tlen += taosEncodeFixedI64(buf, pConsumerEp->lastVgHbTs); - //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); tlen += taosEncodeString(buf, pConsumerEp->qmsg); return tlen; } @@ -385,10 +365,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeFixedI32(buf, &pConsumerEp->status); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); + buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = taosDecodeFixedI64(buf, &pConsumerEp->lastConsumerHbTs); - buf = taosDecodeFixedI64(buf, &pConsumerEp->lastVgHbTs); - //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); buf = taosDecodeString(buf, &pConsumerEp->qmsg); return buf; } @@ -399,97 +377,89 @@ static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) { } } -// unit for rebalance -typedef struct SMqSubscribeObj { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - int32_t epoch; - // TODO: replace with priority queue - int32_t nextConsumerIdx; - SArray* availConsumer; // SArray (consumerId) - SArray* assigned; // SArray - SArray* idleConsumer; // SArray - SArray* lostConsumer; // SArray - SArray* unassignedVg; // SArray -} SMqSubscribeObj; +typedef struct { + int64_t consumerId; + SArray* vgInfo; // SArray +} SMqSubConsumer; -static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { - SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj)); - if (pSub == NULL) { - return NULL; +static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsumer* pConsumer) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); + int32_t sz = taosArrayGetSize(pConsumer->vgInfo); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i); + tlen += tEncodeSMqConsumerEp(buf, pCEp); } - pSub->key[0] = 0; - pSub->epoch = 0; + return tlen; +} - pSub->availConsumer = taosArrayInit(0, sizeof(int64_t)); - if (pSub->availConsumer == NULL) { - free(pSub); - return NULL; +static FORCE_INLINE void* tDecodeSMqSubConsumer(void** buf, SMqSubConsumer* pConsumer) { + int32_t sz; + buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); + buf = taosDecodeFixedI32(buf, &sz); + pConsumer->vgInfo = taosArrayInit(sz, sizeof(SMqConsumerEp)); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp consumerEp; + buf = tDecodeSMqConsumerEp(buf, &consumerEp); + taosArrayPush(pConsumer->vgInfo, &consumerEp); } - pSub->assigned = taosArrayInit(0, sizeof(SMqConsumerEp)); - if (pSub->assigned == NULL) { - taosArrayDestroy(pSub->availConsumer); - free(pSub); - return NULL; + return buf; +} + +static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) { + if (pSubConsumer->vgInfo) { + taosArrayDestroyEx(pSubConsumer->vgInfo, (void (*)(void*))tDeleteSMqConsumerEp); + pSubConsumer->vgInfo = NULL; } - pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); - if (pSub->lostConsumer == NULL) { - taosArrayDestroy(pSub->availConsumer); - taosArrayDestroy(pSub->assigned); - free(pSub); +} + +typedef struct { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + int32_t status; + int32_t vgNum; + SArray* consumers; // SArray + SArray* unassignedVg; // SArray +} SMqSubscribeObj; + +static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { + SMqSubscribeObj* pSub = calloc(1, sizeof(SMqSubscribeObj)); + if (pSub == NULL) { return NULL; } - pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); - if (pSub->idleConsumer == NULL) { - taosArrayDestroy(pSub->availConsumer); - taosArrayDestroy(pSub->assigned); - taosArrayDestroy(pSub->lostConsumer); - free(pSub); - return NULL; + pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer)); + if (pSub->consumers == NULL) { + goto _err; } pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); if (pSub->unassignedVg == NULL) { - taosArrayDestroy(pSub->availConsumer); - taosArrayDestroy(pSub->assigned); - taosArrayDestroy(pSub->lostConsumer); - taosArrayDestroy(pSub->idleConsumer); - free(pSub); - return NULL; + goto _err; } + + pSub->key[0] = 0; + pSub->vgNum = 0; + pSub->status = 0; + return pSub; + +_err: + tfree(pSub->unassignedVg); + tfree(pSub->consumers); + tfree(pSub); + return NULL; } static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { int32_t tlen = 0; tlen += taosEncodeString(buf, pSub->key); - tlen += taosEncodeFixedI32(buf, pSub->epoch); + tlen += taosEncodeFixedI32(buf, pSub->vgNum); + tlen += taosEncodeFixedI32(buf, pSub->status); int32_t sz; - sz = taosArrayGetSize(pSub->availConsumer); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - int64_t* pConsumerId = taosArrayGet(pSub->availConsumer, i); - tlen += taosEncodeFixedI64(buf, *pConsumerId); - } - - sz = taosArrayGetSize(pSub->assigned); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i); - tlen += tEncodeSMqConsumerEp(buf, pCEp); - } - - sz = taosArrayGetSize(pSub->lostConsumer); + sz = taosArrayGetSize(pSub->consumers); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp* pCEp = taosArrayGet(pSub->lostConsumer, i); - tlen += tEncodeSMqConsumerEp(buf, pCEp); - } - - sz = taosArrayGetSize(pSub->idleConsumer); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp* pCEp = taosArrayGet(pSub->idleConsumer, i); - tlen += tEncodeSMqConsumerEp(buf, pCEp); + SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->consumers, i); + tlen += tEncodeSMqSubConsumer(buf, pSubConsumer); } sz = taosArrayGetSize(pSub->unassignedVg); @@ -504,68 +474,25 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) { buf = taosDecodeStringTo(buf, pSub->key); - buf = taosDecodeFixedI32(buf, &pSub->epoch); + buf = taosDecodeFixedI32(buf, &pSub->vgNum); + buf = taosDecodeFixedI32(buf, &pSub->status); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); - pSub->availConsumer = taosArrayInit(sz, sizeof(int64_t)); - if (pSub->availConsumer == NULL) { - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - int64_t consumerId; - buf = taosDecodeFixedI64(buf, &consumerId); - taosArrayPush(pSub->availConsumer, &consumerId); - } - - buf = taosDecodeFixedI32(buf, &sz); - pSub->assigned = taosArrayInit(sz, sizeof(SMqConsumerEp)); - if (pSub->assigned == NULL) { - taosArrayDestroy(pSub->availConsumer); + pSub->consumers = taosArrayInit(sz, sizeof(SMqSubConsumer)); + if (pSub->consumers == NULL) { return NULL; } for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp = {0}; - buf = tDecodeSMqConsumerEp(buf, &cEp); - taosArrayPush(pSub->assigned, &cEp); + SMqSubConsumer subConsumer = {0}; + buf = tDecodeSMqSubConsumer(buf, &subConsumer); + taosArrayPush(pSub->consumers, &subConsumer); } - buf = taosDecodeFixedI32(buf, &sz); - pSub->lostConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp)); - if (pSub->lostConsumer == NULL) { - taosArrayDestroy(pSub->availConsumer); - taosArrayDestroy(pSub->assigned); - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp = {0}; - buf = tDecodeSMqConsumerEp(buf, &cEp); - taosArrayPush(pSub->lostConsumer, &cEp); - } - - buf = taosDecodeFixedI32(buf, &sz); - pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp)); - if (pSub->idleConsumer == NULL) { - taosArrayDestroy(pSub->availConsumer); - taosArrayDestroy(pSub->assigned); - taosArrayDestroy(pSub->lostConsumer); - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp cEp = {0}; - buf = tDecodeSMqConsumerEp(buf, &cEp); - taosArrayPush(pSub->idleConsumer, &cEp); - } - - buf = taosDecodeFixedI32(buf, &sz); pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); if (pSub->unassignedVg == NULL) { - taosArrayDestroy(pSub->availConsumer); - taosArrayDestroy(pSub->assigned); - taosArrayDestroy(pSub->lostConsumer); - taosArrayDestroy(pSub->idleConsumer); return NULL; } for (int32_t i = 0; i < sz; i++) { @@ -573,50 +500,29 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) buf = tDecodeSMqConsumerEp(buf, &cEp); taosArrayPush(pSub->unassignedVg, &cEp); } - return buf; } static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { - if (pSub->availConsumer) { - taosArrayDestroy(pSub->availConsumer); - pSub->availConsumer = NULL; - } - if (pSub->assigned) { - //taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp); - taosArrayDestroy(pSub->assigned); - pSub->assigned = NULL; + if (pSub->consumers) { + taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer); + //taosArrayDestroy(pSub->consumers); + pSub->consumers = NULL; } + if (pSub->unassignedVg) { - //taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); - taosArrayDestroy(pSub->unassignedVg); + taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); + //taosArrayDestroy(pSub->unassignedVg); pSub->unassignedVg = NULL; } - if (pSub->idleConsumer) { - //taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp); - taosArrayDestroy(pSub->idleConsumer); - pSub->idleConsumer = NULL; - } - if (pSub->lostConsumer) { - //taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp); - taosArrayDestroy(pSub->lostConsumer); - pSub->lostConsumer = NULL; - } } -typedef struct SMqCGroup { - char name[TSDB_CONSUMER_GROUP_LEN]; - int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal - SList* consumerIds; // SList - SList* idleVGroups; // SList -} SMqCGroup; - -typedef struct SMqTopicObj { +typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; int64_t createTime; int64_t updateTime; - uint64_t uid; + int64_t uid; int64_t dbUid; int32_t version; SRWLatch lock; @@ -624,79 +530,23 @@ typedef struct SMqTopicObj { char* sql; char* logicalPlan; char* physicalPlan; - // SHashObj *cgroups; // SHashObj - // SHashObj *consumers; // SHashObj } SMqTopicObj; -// TODO: add cache and change name to id -typedef struct SMqConsumerTopic { - char name[TSDB_TOPIC_FNAME_LEN]; - int32_t epoch; - // vg assigned to the consumer on the topic - SArray* pVgInfo; // SArray -} SMqConsumerTopic; - -static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, - SMqSubscribeObj* pSub, int64_t* oldConsumerId) { - SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic)); - if (pCTopic == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - strcpy(pCTopic->name, pTopic->name); - pCTopic->epoch = 0; - pCTopic->pVgInfo = taosArrayInit(0, sizeof(int32_t)); - - int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg); - if (unassignedVgSz > 0) { - SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg); - *oldConsumerId = pCEp->consumerId; - pCEp->consumerId = consumerId; - taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId); - taosArrayPush(pSub->assigned, pCEp); - } - return pCTopic; -} - -static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic* pConsumerTopic) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pConsumerTopic->name); - tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); - int32_t sz = 0; - if (pConsumerTopic->pVgInfo != NULL) { - sz = taosArrayGetSize(pConsumerTopic->pVgInfo); - } - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i); - tlen += taosEncodeFixedI32(buf, *pVgInfo); - } - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* pConsumerTopic) { - buf = taosDecodeStringTo(buf, pConsumerTopic->name); - buf = taosDecodeFixedI32(buf, &pConsumerTopic->epoch); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pConsumerTopic->pVgInfo = taosArrayInit(sz, sizeof(SMqConsumerTopic)); - for (int32_t i = 0; i < sz; i++) { - int32_t vgInfo; - buf = taosDecodeFixedI32(buf, &vgInfo); - taosArrayPush(pConsumerTopic->pVgInfo, &vgInfo); - } - return buf; -} - -typedef struct SMqConsumerObj { +typedef struct { int64_t consumerId; int64_t connId; SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; - SArray* topics; // SArray + SArray* topics; // SArray int64_t epoch; // stat - int64_t pollCnt; + int64_t pollCnt; + // status + int32_t status; + // heartbeat from the consumer reset hbStatus to 0 + // each checkConsumerAlive msg add hbStatus by 1 + // if checkConsumerAlive > CONSUMER_REBALANCE_CNT, mask to lost + int32_t hbStatus; } SMqConsumerObj; static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { @@ -709,88 +559,29 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO int32_t sz = taosArrayGetSize(pConsumer->topics); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { - SMqConsumerTopic* pConsumerTopic = taosArrayGet(pConsumer->topics, i); - tlen += tEncodeSMqConsumerTopic(buf, pConsumerTopic); + char* topic = taosArrayGetP(pConsumer->topics, i); + tlen += taosEncodeString(buf, topic); } return tlen; } static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) { + int32_t sz; buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); buf = taosDecodeFixedI64(buf, &pConsumer->connId); buf = taosDecodeFixedI64(buf, &pConsumer->epoch); buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt); buf = taosDecodeStringTo(buf, pConsumer->cgroup); - int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj)); for (int32_t i = 0; i < sz; i++) { - SMqConsumerTopic cTopic; - buf = tDecodeSMqConsumerTopic(buf, &cTopic); - taosArrayPush(pConsumer->topics, &cTopic); + char* topic; + buf = taosDecodeString(buf, &topic); + taosArrayPush(pConsumer->topics, &topic); } return buf; } -typedef struct SMqSubConsumerObj { - int64_t consumerUid; // if -1, unassigned - SList* vgId; // SList -} SMqSubConsumerObj; - -typedef struct SMqSubCGroupObj { - char name[TSDB_CONSUMER_GROUP_LEN]; - SList* consumers; // SList -} SMqSubCGroupObj; - -typedef struct SMqSubTopicObj { - char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createTime; - int64_t updateTime; - int64_t uid; - int64_t dbUid; - int32_t version; - SRWLatch lock; - int32_t sqlLen; - char* sql; - char* logicalPlan; - char* physicalPlan; - SList* cgroups; // SList -} SMqSubTopicObj; - -typedef struct SMqConsumerSubObj { - int64_t topicUid; - SList* vgIds; // SList -} SMqConsumerSubObj; - -typedef struct SMqConsumerHbObj { - int64_t consumerId; - SList* consumerSubs; // SList -} SMqConsumerHbObj; - -typedef struct SMqVGroupSubObj { - int64_t topicUid; - SList* consumerIds; // SList -} SMqVGroupSubObj; - -typedef struct SMqVGroupHbObj { - int64_t vgId; - SList* vgSubs; // SList -} SMqVGroupHbObj; - -#if 0 -typedef struct SCGroupObj { - char name[TSDB_TOPIC_NAME_LEN]; - int64_t createTime; - int64_t updateTime; - uint64_t uid; - //uint64_t dbUid; - int32_t version; - SRWLatch lock; - SList* consumerIds; -} SCGroupObj; -#endif - typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 29ccd43622e84d194ac5ad59a231795c96b980f3..97cd6caf0476e4a275383d463caaf9d392416746 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -96,6 +96,7 @@ typedef struct SMnode { SendReqToMnodeFp sendReqToMnodeFp; SendRedirectRspFp sendRedirectRspFp; PutReqToMWriteQFp putReqToMWriteQFp; + PutReqToMReadQFp putReqToMReadQFp; } SMnode; int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index fd960eb63eda1d34071b01b88ba1fad66171fb45..3ccb2f6e5117bc15de75a682aea0a593f16c4a64 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -53,6 +53,19 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} +SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) { + SMqConsumerObj* pConsumer = malloc(sizeof(SMqConsumerObj)); + if (pConsumer == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pConsumer->epoch = 1; + pConsumer->consumerId = consumerId; + strcpy(pConsumer->cgroup, cgroup); + taosInitRWLatch(&pConsumer->lock); + return pConsumer; +} + SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_OUT_OF_MEMORY; void* buf = NULL; @@ -164,148 +177,3 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pConsumer); } - -#if 0 -static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - STableInfoReq *pInfo = pMsg->rpcMsg.pCont; - - mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname); - - SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname); - if (pConsumer == NULL) { - mndReleaseDb(pMnode, pDb); - terrno = TSDB_CODE_MND_INVALID_CONSUMER; - mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - taosRLockLatch(&pConsumer->lock); - int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags; - int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - - STableMetaRsp *pMeta = rpcMallocCont(contLen); - if (pMeta == NULL) { - taosRUnLockLatch(&pConsumer->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseConsumer(pMnode, pConsumer); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN); - pMeta->numOfTags = htonl(pConsumer->numOfTags); - pMeta->numOfColumns = htonl(pConsumer->numOfColumns); - pMeta->precision = pDb->cfg.precision; - pMeta->tableType = TSDB_SUPER_TABLE; - pMeta->update = pDb->cfg.update; - pMeta->sversion = htonl(pConsumer->version); - pMeta->tuid = htonl(pConsumer->uid); - - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pMeta->pSchema[i]; - SSchema *pSrcSchema = &pConsumer->pSchema[i]; - memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); - pSchema->type = pSrcSchema->type; - pSchema->colId = htonl(pSrcSchema->colId); - pSchema->bytes = htonl(pSrcSchema->bytes); - } - taosRUnLockLatch(&pConsumer->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseConsumer(pMnode, pConsumer); - - pMsg->pCont = pMeta; - pMsg->contLen = contLen; - - mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); - return 0; -} - -static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) { - SSdb *pSdb = pMnode->pSdb; - - SDbObj *pDb = mndAcquireDb(pMnode, dbName); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - return -1; - } - - int32_t numOfConsumers = 0; - void *pIter = NULL; - while (1) { - SMqConsumerObj *pConsumer = NULL; - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) break; - - numOfConsumers++; - - sdbRelease(pSdb, pConsumer); - } - - *pNumOfConsumers = numOfConsumers; - return 0; -} - -static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; - - if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) { - return -1; - } - - int32_t cols = 0; - SSchema *pSchema = pMeta->pSchema; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create_time"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "columns"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "tags"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htonl(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) { - pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - } - - pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER); - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - strcpy(pMeta->tbFname, mndShowStr(pShow->type)); - - return 0; -} - -static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { - SSdb *pSdb = pMnode->pSdb; - sdbCancelFetch(pSdb, pIter); -} -#endif diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 938a3eb89e61946447341855364e94f7e066715e..4824f2d06b5eca07659e58e2903bc175a3ae689f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -31,9 +31,20 @@ #define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_RESERVE_SIZE 64 -#define MND_SUBSCRIBE_REBALANCE_MS 5000 +#define MND_SUBSCRIBE_REBALANCE_CNT 3 -static char *mndMakeSubscribeKey(char *cgroup, char *topicName); +enum { + MQ_CONSUMER_STATUS__INIT = 1, + MQ_CONSUMER_STATUS__ACTIVE, + MQ_CONSUMER_STATUS__LOST, +}; + +enum { + MQ_SUBSCRIBE_STATUS__ACTIVE = 1, + MQ_SUBSCRIBE_STATUS__DELETED, +}; + +static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName); static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); @@ -48,9 +59,10 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); -static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub, - int64_t oldConsumerId); +static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, + const SMqConsumerEp *pSub); + +static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -68,12 +80,140 @@ int32_t mndInitSubscribe(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } +static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *consumerGroup) { + SMqSubscribeObj *pSub = tNewSubscribeObj(); + if (pSub == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + char *key = mndMakeSubscribeKey(consumerGroup, pTopic->name); + if (key == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tDeleteSMqSubscribeObj(pSub); + free(pSub); + return NULL; + } + strcpy(pSub->key, key); + free(key); + + if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tDeleteSMqSubscribeObj(pSub); + free(pSub); + return NULL; + } + // TODO: disable alter subscribed table + return pSub; +} + +static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicObj *pTopic, + const SMqConsumerEp *pConsumerEp, const char *cgroup) { + SMqSetCVgReq req = {0}; + strcpy(req.cgroup, cgroup); + strcpy(req.topicName, pTopic->name); + req.sql = pTopic->sql; + req.logicalPlan = pTopic->logicalPlan; + req.physicalPlan = pTopic->physicalPlan; + req.qmsg = pConsumerEp->qmsg; + req.oldConsumerId = pConsumerEp->oldConsumerId; + req.newConsumerId = pConsumerEp->consumerId; + req.vgId = pConsumerEp->vgId; + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *buf = malloc(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + SMsgHead *pMsgHead = (SMsgHead *)buf; + + pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); + pMsgHead->vgId = htonl(pConsumerEp->vgId); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSMqSetCVgReq(&abuf, &req); + *pBuf = buf; + *pLen = tlen; + + return 0; +} + +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, + const SMqConsumerEp *pConsumerEp, const char *cgroup) { + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + + void *buf; + int32_t tlen; + if (mndBuildRebalanceMsg(&buf, &tlen, pTopic, pConsumerEp, cgroup) < 0) { + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + tlen; + action.msgType = TDMT_VND_MQ_SET_CONN; + + mndReleaseVgroup(pMnode, pVgObj); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(buf); + return -1; + } + + return 0; +} + +static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) { + SMqSetCVgReq req = {0}; + req.oldConsumerId = pConsumerEp->consumerId; + req.newConsumerId = -1; + + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *buf = malloc(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + SMsgHead *pMsgHead = (SMsgHead *)buf; + + pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); + pMsgHead->vgId = htonl(pConsumerEp->vgId); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSMqSetCVgReq(&abuf, &req); + *pBuf = buf; + *pLen = tlen; + return 0; +} + +static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) { + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + + void *buf; + int32_t tlen; + if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) { + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + tlen; + action.msgType = TDMT_VND_MQ_SET_CONN; + + mndReleaseVgroup(pMnode, pVgObj); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(buf); + return -1; + } + + return 0; +} + static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqCMGetSubEpRsp rsp = {0}; int64_t consumerId = be64toh(pReq->consumerId); - int64_t currentTs = taosGetTimestampMs(); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); if (pConsumer == NULL) { @@ -85,48 +225,34 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(rsp.cgroup, pReq->cgroup); rsp.consumerId = consumerId; rsp.epoch = pConsumer->epoch; - SArray *pTopics = pConsumer->topics; - int32_t sz = taosArrayGetSize(pTopics); - rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); - for (int32_t i = 0; i < sz; i++) { - SMqSubTopicEp topicEp; - SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i); - strcpy(topicEp.topic, pConsumerTopic->name); - - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name); - ASSERT(pSub); - bool found = 0; - bool changed = 0; - for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { - if (*(int64_t *)taosArrayGet(pSub->availConsumer, j) == consumerId) { - found = 1; - break; - } - } - if (found == 0) { - taosArrayPush(pSub->availConsumer, &consumerId); - } - - int32_t assignedSz = taosArrayGetSize(pSub->assigned); - topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); - for (int32_t j = 0; j < assignedSz; j++) { - SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j); - if (pCEp->consumerId == consumerId) { - pCEp->lastConsumerHbTs = currentTs; - SMqSubVgEp vgEp = {.epSet = pCEp->epSet, .vgId = pCEp->vgId}; - taosArrayPush(topicEp.vgs, &vgEp); - changed = 1; + if (pReq->epoch != rsp.epoch) { + SArray *pTopics = pConsumer->topics; + int sz = taosArrayGetSize(pTopics); + rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); + for (int i = 0; i < sz; i++) { + char *topicName = taosArrayGetP(pTopics, i); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName); + ASSERT(pSub); + int csz = taosArrayGetSize(pSub->consumers); + //TODO: change to bsearch + for (int j = 0; j < csz; j++) { + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); + if (consumerId == pSubConsumer->consumerId) { + int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + SMqSubTopicEp topicEp; + topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); + for (int k = 0; k < vgsz; k++) { + SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); + + SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; + taosArrayPush(topicEp.vgs, &vgEp); + } + taosArrayPush(rsp.topics, &topicEp); + break; + } } + mndReleaseSubscribe(pMnode, pSub); } - if (taosArrayGetSize(topicEp.vgs) != 0) { - taosArrayPush(rsp.topics, &topicEp); - } - if (changed || found) { - SSdbRaw *pRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pRaw, SDB_STATUS_READY); - sdbWrite(pMnode->pSdb, pRaw); - } - mndReleaseSubscribe(pMnode, pSub); } int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); void *buf = rpcMallocCont(tlen); @@ -155,19 +281,133 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { } static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; - SMqSubscribeObj *pSub = NULL; - void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); - int64_t currentTs = taosGetTimestampMs(); - int32_t sz; - while (pIter != NULL) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + SMqConsumerObj *pConsumer; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) break; + int32_t hbStatus = atomic_fetch_add_32(&pConsumer->hbStatus, 1); + if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) { + int32_t old = + atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); + if (old == MQ_CONSUMER_STATUS__ACTIVE) { + SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); + pRebMsg->consumerId = pConsumer->consumerId; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)}; + pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); + } + } + } + return 0; +} + +static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont; + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pReq->consumerId); + int topicSz = taosArrayGetSize(pConsumer->topics); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + for (int i = 0; i < topicSz; i++) { + char *topic = taosArrayGetP(pConsumer->topics, i); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); + int32_t consumerNum = taosArrayGetSize(pSub->consumers); + if (consumerNum != 0) { + int32_t vgNum = pSub->vgNum; + int32_t vgEachConsumer = vgNum / consumerNum; + int32_t left = vgNum % consumerNum; + int32_t leftUsed = 0; + SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp)); + SArray *unassignedConsumer = taosArrayInit(0, sizeof(int32_t)); + for (int32_t j = 0; j < consumerNum; j++) { + bool changed = false; + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); + int32_t vgOneConsumer = taosArrayGetSize(pSubConsumer->vgInfo); + bool canUseLeft = leftUsed < left; + if (vgOneConsumer > vgEachConsumer + canUseLeft) { + changed = true; + if (canUseLeft) leftUsed++; + // put into unassigned + while (taosArrayGetSize(pSubConsumer->vgInfo) > vgEachConsumer + canUseLeft) { + SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); + ASSERT(pConsumerEp != NULL); + taosArrayPush(unassignedVgStash, pConsumerEp); + // build msg and persist into trans + } + } else if (vgOneConsumer < vgEachConsumer) { + changed = true; + // assign from unassigned + while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) { + // if no unassgined, save j + if (taosArrayGetSize(unassignedVgStash) == 0) { + taosArrayPush(unassignedConsumer, &j); + break; + } + SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash); + ASSERT(pConsumerEp != NULL); + pConsumerEp->oldConsumerId = pConsumerEp->consumerId; + pConsumerEp->consumerId = pSubConsumer->consumerId; + taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); + // build msg and persist into trans + } + } + if (changed) { + SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); + pRebConsumer->epoch++; + SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pRebConsumer); + sdbSetRawStatus(pRebConsumer, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pConsumerRaw); + } + } + + for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumer); j++) { + int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumer, j); + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx); + while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) { + SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash); + ASSERT(pConsumerEp != NULL); + pConsumerEp->oldConsumerId = pConsumerEp->consumerId; + pConsumerEp->consumerId = pSubConsumer->consumerId; + taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); + // build msg and persist into trans + } + } + ASSERT(taosArrayGetSize(unassignedVgStash) == 0); + + // send msg to vnode + // log rebalance statistics + SSdbRaw *pSubRaw = mndSubscribeActionEncode(pSub); + sdbSetRawStatus(pSubRaw, SDB_STATUS_READY); + mndTransAppendRedolog(pTrans, pSubRaw); + } + mndReleaseSubscribe(pMnode, pSub); + } + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); + return -1; + } + + mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); + return 0; +} + +#if 0 + //update consumer status for the subscribption for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); int64_t consumerId = pCEp->consumerId; - if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) { + if (pCEp->status != -1) { + int32_t consumerHbStatus = atomic_fetch_add_32(&pCEp->consumerHbStatus, 1); + if (consumerHbStatus < MND_SUBSCRIBE_REBALANCE_CNT) { + continue; + } // put consumer into lostConsumer - taosArrayPush(pSub->lostConsumer, pCEp); + SMqConsumerEp* lostConsumer = taosArrayPush(pSub->lostConsumer, pCEp); + lostConsumer->qmsg = NULL; // put vg into unassigned taosArrayPush(pSub->unassignedVg, pCEp); // remove from assigned @@ -192,91 +432,76 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { #endif } } - if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0 && taosArrayGetSize(pSub->availConsumer) > 0) { + // no available consumer, skip rebalance + if (taosArrayGetSize(pSub->availConsumer) == 0) { + continue; + } + taosArrayGet(pSub->availConsumer, 0); + // rebalance condition1 : have unassigned vg + // assign vg to a consumer, trying to find the least assigned one + if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) { char *topic = NULL; char *cgroup = NULL; mndSplitSubscribeKey(pSub->key, &topic, &cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - - // create trans STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); for (int32_t i = 0; i < sz; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx); + pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); + SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); - int64_t oldConsumerId = pCEp->consumerId; + pCEp->oldConsumerId = pCEp->consumerId; pCEp->consumerId = consumerId; taosArrayPush(pSub->assigned, pCEp); - pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); pConsumer->epoch++; - /*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ - /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ - /*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/ + SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer); + sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); + sdbWrite(pMnode->pSdb, pConsumerRaw); mndReleaseConsumer(pMnode, pConsumer); - // build msg - - SMqSetCVgReq req = {0}; - strcpy(req.cgroup, cgroup); - strcpy(req.topicName, topic); - req.sql = pTopic->sql; - req.logicalPlan = pTopic->logicalPlan; - req.physicalPlan = pTopic->physicalPlan; - req.qmsg = pCEp->qmsg; - req.oldConsumerId = oldConsumerId; - req.newConsumerId = consumerId; - req.vgId = pCEp->vgId; - int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); - void *buf = malloc(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - SMsgHead *pMsgHead = (SMsgHead *)buf; - - pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); - pMsgHead->vgId = htonl(pCEp->vgId); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqSetCVgReq(&abuf, &req); + void* msg; + int32_t msgLen; + mndBuildRebalanceMsg(&msg, &msgLen, pTopic, pCEp, cgroup, topic); // persist msg - // TODO: no need for txn STransAction action = {0}; action.epSet = pCEp->epSet; - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + tlen; + action.pCont = msg; + action.contLen = sizeof(SMsgHead) + msgLen; action.msgType = TDMT_VND_MQ_SET_CONN; mndTransAppendRedoAction(pTrans, &action); - // persist raw + // persist data SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pRaw); - - tfree(topic); - tfree(cgroup); } + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); } mndReleaseTopic(pMnode, pTopic); mndTransDrop(pTrans); + tfree(topic); + tfree(cgroup); } - pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); + // rebalance condition2 : imbalance assignment } return 0; } +#endif -static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { - // convert phyplan to dag +static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SArray *pArray = NULL; SArray *inner = taosArrayGet(pDag->pSubplans, 0); SSubplan *plan = taosArrayGetP(inner, 0); - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; + SArray *unassignedVg = pSub->unassignedVg; void *pIter = NULL; while (1) { @@ -284,6 +509,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas if (pIter == NULL) break; if (pVgroup->dbUid != pTopic->dbUid) continue; + pSub->vgNum++; plan->execNode.nodeId = pVgroup->vgId; plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup); @@ -298,73 +524,66 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas return -1; } - SMqConsumerEp CEp = {0}; - CEp.status = 0; - CEp.consumerId = -1; - CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; + SMqConsumerEp consumerEp = {0}; + consumerEp.status = 0; + consumerEp.consumerId = -1; STaskInfo *pTaskInfo = taosArrayGet(pArray, 0); - CEp.epSet = pTaskInfo->addr.epset; - CEp.vgId = pTaskInfo->addr.nodeId; + consumerEp.epSet = pTaskInfo->addr.epset; + consumerEp.vgId = pTaskInfo->addr.nodeId; - ASSERT(CEp.vgId == pVgroup->vgId); - CEp.qmsg = strdup(pTaskInfo->msg->msg); - taosArrayPush(unassignedVg, &CEp); + ASSERT(consumerEp.vgId == pVgroup->vgId); + consumerEp.qmsg = strdup(pTaskInfo->msg->msg); + taosArrayPush(unassignedVg, &consumerEp); // TODO: free taskInfo taosArrayDestroy(pArray); - - /*SEpSet *pEpSet = &plan->execNode.epset;*/ - /*pEpSet->inUse = 0;*/ - /*addEpIntoEpSet(pEpSet, "localhost", 6030);*/ } /*qDestroyQueryDag(pDag);*/ return 0; } -static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp, - int64_t oldConsumerId) { - int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); - for (int32_t i = 0; i < sz; i++) { - int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - SMqSetCVgReq req = { - .vgId = vgId, - .oldConsumerId = oldConsumerId, - .newConsumerId = pConsumer->consumerId, - }; - strcpy(req.cgroup, pConsumer->cgroup); - strcpy(req.topicName, pTopic->name); - req.sql = pTopic->sql; - req.logicalPlan = pTopic->logicalPlan; - req.physicalPlan = pTopic->physicalPlan; - req.qmsg = pCEp->qmsg; - int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); - void *buf = malloc(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } +static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, + const SMqConsumerEp *pConsumerEp) { + int32_t vgId = pConsumerEp->vgId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + + SMqSetCVgReq req = { + .vgId = vgId, + .oldConsumerId = pConsumerEp->oldConsumerId, + .newConsumerId = pConsumerEp->consumerId, + .sql = pTopic->sql, + .logicalPlan = pTopic->logicalPlan, + .physicalPlan = pTopic->physicalPlan, + .qmsg = pConsumerEp->qmsg, + }; + + strcpy(req.cgroup, cgroup); + strcpy(req.topicName, pTopic->name); + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *buf = malloc(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - SMsgHead *pMsgHead = (SMsgHead *)buf; + SMsgHead *pMsgHead = (SMsgHead *)buf; - pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); - pMsgHead->vgId = htonl(vgId); + pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); + pMsgHead->vgId = htonl(vgId); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqSetCVgReq(&abuf, &req); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSMqSetCVgReq(&abuf, &req); - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + tlen; - action.msgType = TDMT_VND_MQ_SET_CONN; + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + tlen; + action.msgType = TDMT_VND_MQ_SET_CONN; - mndReleaseVgroup(pMnode, pVgObj); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(buf); - return -1; - } + mndReleaseVgroup(pMnode, pVgObj); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(buf); + return -1; } return 0; } @@ -373,7 +592,7 @@ void mndCleanupSubscribe(SMnode *pMnode) {} static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { terrno = TSDB_CODE_OUT_OF_MEMORY; - void* buf = NULL; + void *buf = NULL; int32_t tlen = tEncodeSubscribeObj(NULL, pSub); int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE; @@ -408,7 +627,7 @@ SUB_ENCODE_OVER: static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; - void* buf = NULL; + void *buf = NULL; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; @@ -443,7 +662,6 @@ SUB_DECODE_OVER: tfree(buf); if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); - // TODO free subscribeobj tfree(pRow); return NULL; } @@ -467,7 +685,7 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc return 0; } -static char *mndMakeSubscribeKey(char *cgroup, char *topicName) { +static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) { char *key = malloc(TSDB_SHOW_SUBQUERY_LEN); if (key == NULL) { return NULL; @@ -501,8 +719,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SCMSubscribeReq subscribe; tDeserializeSCMSubscribeReq(msgStr, &subscribe); int64_t consumerId = subscribe.consumerId; - char *consumerGroup = subscribe.consumerGroup; - int32_t cgroupLen = strlen(consumerGroup); + char *cgroup = subscribe.consumerGroup; SArray *newSub = subscribe.topicNames; int newTopicNum = subscribe.topicNum; @@ -511,24 +728,18 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SArray *oldSub = NULL; int oldTopicNum = 0; + bool createConsumer = false; // create consumer if not exist SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { // create consumer - pConsumer = malloc(sizeof(SMqConsumerObj)); - if (pConsumer == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pConsumer->epoch = 1; - pConsumer->consumerId = consumerId; - strcpy(pConsumer->cgroup, consumerGroup); - taosInitRWLatch(&pConsumer->lock); + pConsumer = mndCreateConsumer(consumerId, cgroup); + createConsumer = true; } else { pConsumer->epoch++; oldSub = pConsumer->topics; } - pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); + pConsumer->topics = newSub; if (oldSub != NULL) { oldTopicNum = taosArrayGetSize(oldSub); @@ -546,14 +757,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { char *oldTopicName = NULL; if (i >= newTopicNum) { // encode unset topic msg to all vnodes related to that topic - oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; + oldTopicName = taosArrayGetP(oldSub, j); j++; } else if (j >= oldTopicNum) { newTopicName = taosArrayGetP(newSub, i); i++; } else { newTopicName = taosArrayGetP(newSub, i); - oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; + oldTopicName = taosArrayGetP(oldSub, j); int comp = compareLenPrefixedStr(newTopicName, oldTopicName); if (comp == 0) { @@ -572,54 +783,25 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } if (oldTopicName != NULL) { -#if 0 - // cancel subscribe of that old topic - ASSERT(pNewTopic == NULL); - char *oldTopicName = pOldTopic->name; - SList *vgroups = pOldTopic->vgroups; - SListIter iter; - tdListInitIter(vgroups, &iter, TD_LIST_FORWARD); - SListNode *pn; - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); - ASSERT(pTopic != NULL); - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName); - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); - while ((pn = tdListNext(&iter)) != NULL) { - int32_t vgId = *(int64_t *)pn->data; - // acquire and get epset - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - // TODO what time to release? - if (pVgObj == NULL) { - // TODO handle error - continue; - } - // build reset msg - void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); - // TODO:serialize - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = pMqVgSetReq; - action.contLen = 0; // TODO - action.msgType = TDMT_VND_MQ_SET_CONN; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMqVgSetReq); - mndTransDrop(pTrans); - // TODO free - return -1; + ASSERT(newTopicName == NULL); + + // cancel subscribe of old topic + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName); + ASSERT(pSub); + int csz = taosArrayGetSize(pSub->consumers); + for (int ci = 0; ci < csz; ci++) { + SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci); + if (pSubConsumer->consumerId == consumerId) { + int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + for (int vgi = 0; vgi < vgsz; vgi++) { + SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); + mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp); + } + break; } } - // delete data in mnode - taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); - mndReleaseSubscribe(pMnode, pSub); - mndReleaseTopic(pMnode, pTopic); -#endif + pSub->status = MQ_SUBSCRIBE_STATUS__DELETED; } else if (newTopicName != NULL) { - // save subscribe info to mnode ASSERT(oldTopicName == NULL); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); @@ -628,111 +810,53 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { continue; } - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); - bool create = false; + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName); + bool createSub = false; if (pSub == NULL) { - mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName); - pSub = tNewSubscribeObj(); - if (pSub == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - char *key = mndMakeSubscribeKey(consumerGroup, newTopicName); - if (key == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - strcpy(pSub->key, key); - free(key); - // set unassigned vg - if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) { - // TODO: free memory - return -1; - } - // TODO: disable alter - create = true; + mDebug("create new subscription by consumer %ld, group: %s, topic %s", consumerId, cgroup, newTopicName); + pSub = mndCreateSubscription(pMnode, pTopic, cgroup); + createSub = true; } - taosArrayPush(pSub->availConsumer, &consumerId); - - int64_t oldConsumerId; - SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub, &oldConsumerId); - taosArrayPush(pConsumer->topics, pConsumerTopic); - - if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { - ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1); - int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); - SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned); - if (pCEp->vgId == vgId) { - if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp, oldConsumerId) < 0) { - // TODO - return -1; - } - } - // send setmsg to vnode + + SMqSubConsumer mqSubConsumer; + mqSubConsumer.consumerId = consumerId; + mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp)); + taosArrayPush(pSub->consumers, &mqSubConsumer); + + // if have un assigned vg, assign one to the consumer + if (taosArrayGetSize(pSub->unassignedVg) > 0) { + SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg); + pConsumerEp->oldConsumerId = pConsumerEp->consumerId; + pConsumerEp->consumerId = consumerId; + taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp); + mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); } SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pRaw); - if (!create) mndReleaseSubscribe(pMnode, pSub); -#if 0 - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); - if (pGroup == NULL) { - // add new group - pGroup = malloc(sizeof(SMqCGroup)); - if (pGroup == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pGroup->consumerIds = tdListNew(sizeof(int64_t)); - if (pGroup->consumerIds == NULL) { - free(pGroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pGroup->status = 0; - // add into cgroups - taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); - } - /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ - // put the consumer into list - // rebalance will be triggered by timer - tdListAppend(pGroup->consumerIds, &consumerId); - - SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); - sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); - // TODO: error handling - mndTransAppendRedolog(pTrans, pTopicRaw); - -#endif - /*mndReleaseTopic(pMnode, pTopic);*/ - /*mndReleaseSubscribe(pMnode, pSub);*/ + if (!createSub) mndReleaseSubscribe(pMnode, pSub); + mndReleaseTopic(pMnode, pTopic); } } - // part3. persist consumerObj - // destroy old sub - if (oldSub) taosArrayDestroy(oldSub); - // put new sub into consumerobj + if (oldSub) taosArrayDestroyEx(oldSub, free); // persist consumerObj SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - // TODO: error handling mndTransAppendRedolog(pTrans, pConsumerRaw); if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - if (newSub) taosArrayDestroy(newSub); + mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - /*mndReleaseConsumer(pMnode, pConsumer);*/ + if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer); return -1; } - if (newSub) taosArrayDestroy(newSub); mndTransDrop(pTrans); - /*mndReleaseConsumer(pMnode, pConsumer);*/ + if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } @@ -741,146 +865,6 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - STableInfoReq *pInfo = pMsg->rpcMsg.pCont; - - mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname); - -#if 0 - SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname); - if (pConsumer == NULL) { - mndReleaseDb(pMnode, pDb); - terrno = TSDB_CODE_MND_INVALID_CONSUMER; - mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - taosRLockLatch(&pConsumer->lock); - int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags; - int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - - STableMetaRsp *pMeta = rpcMallocCont(contLen); - if (pMeta == NULL) { - taosRUnLockLatch(&pConsumer->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseConsumer(pMnode, pConsumer); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN); - pMeta->numOfTags = htonl(pConsumer->numOfTags); - pMeta->numOfColumns = htonl(pConsumer->numOfColumns); - pMeta->precision = pDb->cfg.precision; - pMeta->tableType = TSDB_SUPER_TABLE; - pMeta->update = pDb->cfg.update; - pMeta->sversion = htonl(pConsumer->version); - pMeta->tuid = htonl(pConsumer->uid); - - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pMeta->pSchema[i]; - SSchema *pSrcSchema = &pConsumer->pSchema[i]; - memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); - pSchema->type = pSrcSchema->type; - pSchema->colId = htonl(pSrcSchema->colId); - pSchema->bytes = htonl(pSrcSchema->bytes); - } - taosRUnLockLatch(&pConsumer->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseConsumer(pMnode, pConsumer); - - pMsg->pCont = pMeta; - pMsg->contLen = contLen; - - mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); -#endif - return 0; -} - -static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) { - SSdb *pSdb = pMnode->pSdb; - - SDbObj *pDb = mndAcquireDb(pMnode, dbName); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - return -1; - } - - int32_t numOfConsumers = 0; - void *pIter = NULL; - while (1) { - SMqConsumerObj *pConsumer = NULL; - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) break; - - numOfConsumers++; - - sdbRelease(pSdb, pConsumer); - } - - *pNumOfConsumers = numOfConsumers; - return 0; -} - -static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; - - if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) { - return -1; - } - - int32_t cols = 0; - SSchema *pSchema = pMeta->pSchema; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "create_time"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "columns"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "tags"); - pSchema[cols].bytes = htonl(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htonl(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) { - pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - } - - pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER); - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - strcpy(pMeta->tbFname, mndShowStr(pShow->type)); - - return 0; -} - static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index e57ee3eabc13b4fbedf3ec1dc6486a12a3fbb273..aef58b7acca6be77d8e45eae452dd4c4c1865ebe 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -76,7 +76,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { if (mndIsMaster(pMnode)) { SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg)); SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)}; - pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); + pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); } taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer); @@ -249,6 +249,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->pDnode = pOption->pDnode; pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp; + pMnode->putReqToMReadQFp = pOption->putReqToMReadQFp; pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp; pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp; pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp; diff --git a/source/dnode/vnode/src/inc/tsdbReadImpl.h b/source/dnode/vnode/src/inc/tsdbReadImpl.h index 3fb235e7dda1e9bd53164d1a68de34aeb1dd65cb..54a0b830b2a55414196570141fab3283681b6eaa 100644 --- a/source/dnode/vnode/src/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/src/inc/tsdbReadImpl.h @@ -124,7 +124,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh); int tsdbSetReadTable(SReadH *pReadh, STable *pTable); int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); -int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds); +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 0f2d711a79f467fee792e197cb5a0b55a912cdc2..37dd4c6ca811c3afc3f9d5530e163d0fa054becf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1327,7 +1327,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; TSKEY keyLimit; - int16_t colId = 0; + int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID; SMergeInfo mInfo; SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; SBlock block, supBlock; @@ -1605,4 +1605,4 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p // } // return 0; -// } \ No newline at end of file +// } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 3dcbb7888b767988a13bf023daf68b78050f379e..814bf43f75b3a4a649e0a1fed6ce853aaccfc8de 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -22,7 +22,7 @@ static void tsdbResetReadFile(SReadH *pReadh); static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bufferSize); -static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, +static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds, int numOfColIds); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); @@ -271,7 +271,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { return 0; } -int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, int16_t *colIds, int numOfColsIds) { +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds) { ASSERT(pBlock->numOfSubBlocks > 0); int8_t update = pReadh->pRepo->config.update; @@ -472,7 +472,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat continue; } - int16_t tcolId = 0; + int16_t tcolId = PRIMARYKEY_TIMESTAMP_COL_ID; uint32_t toffset = TSDB_KEY_COL_OFFSET; int32_t tlen = pBlock->keyLen; @@ -548,7 +548,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 return 0; } -static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, +static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds, int numOfColIds) { ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1); ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID); diff --git a/source/dnode/vnode/src/vnd/vnodeBufferPool.c b/source/dnode/vnode/src/vnd/vnodeBufferPool.c index 434498eef59bdbaa1fa95ec7d6c63f262c2057bc..9d1877bdb7dc3b5b4b0b71ea92e85d2056fc60c5 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufferPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufferPool.c @@ -185,6 +185,7 @@ static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) { free(pMA); if (--pVMA->_ref.val == 0) { TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA); + vmaReset(pVMA); TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA); } -} \ No newline at end of file +}