diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 41448a439178f1aef689373eaa6d7f02b60a9187..994cda2a06663ec0414112dc9ddb241f2c00836d 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -188,6 +188,7 @@ typedef struct SRequestSendRecvBody { typedef struct { int8_t resType; + int32_t code; char topic[TSDB_TOPIC_FNAME_LEN]; int32_t vgId; SSchemaWrapper schema; @@ -310,9 +311,8 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v void hbMgrInitMqHbRspHandle(); SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery); -int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); -int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); - +int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); +int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList); #ifdef __cplusplus } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 0e7563bb13ba45c340089161dc7b2f073a1d18cf..818436b41177aa65b89a95ccbc5ae46074ade49c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -110,16 +110,23 @@ int taos_errno(TAOS_RES *tres) { return terrno; } + if (TD_RES_TMQ(tres)) { + return 0; + } + return ((SRequestObj *)tres)->code; } const char *taos_errstr(TAOS_RES *res) { - SRequestObj *pRequest = (SRequestObj *)res; - - if (pRequest == NULL) { + if (res == NULL) { return (const char *)tstrerror(terrno); } + if (TD_RES_TMQ(res)) { + return "success"; + } + + SRequestObj *pRequest = (SRequestObj *)res; if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) { return pRequest->msgBuf; } else { @@ -131,7 +138,7 @@ void taos_free_result(TAOS_RES *res) { if (NULL == res) { return; } - + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; destroyRequest(pRequest); @@ -632,9 +639,7 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { return stmtSetTbName(stmt, name); } -int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { - return taos_stmt_set_tbname(stmt, name); -} +int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); } int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { if (stmt == NULL || bind == NULL) { @@ -648,7 +653,7 @@ int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { terrno = TSDB_CODE_INVALID_PARA; return terrno; } - + return stmtBindBatch(stmt, bind, -1); } @@ -696,7 +701,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, in terrno = TSDB_CODE_INVALID_PARA; return terrno; } - + return stmtBindBatch(stmt, bind, colIdx); } @@ -750,9 +755,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { return stmtUseResult(stmt); } -char *taos_stmt_errstr(TAOS_STMT *stmt) { - return (char *)stmtErrstr(stmt); -} +char *taos_stmt_errstr(TAOS_STMT *stmt) { return (char *)stmtErrstr(stmt); } int taos_stmt_affected_rows(TAOS_STMT *stmt) { if (stmt == NULL) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a7e84b5bba6a661ec75e2ea683d7d1b36751b246..1d0f525cb9130a72572d74d4f24f347fc71ddb43 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -514,12 +514,12 @@ void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); typedef struct { int64_t consumerId; // -1 for unassigned SArray* vgs; // SArray -} SMqConsumerEpInSub; +} SMqConsumerEp; -SMqConsumerEpInSub* tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub* pEpInSub); -void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub* pEpInSub); -int32_t tEncodeSMqConsumerEpInSub(void** buf, const SMqConsumerEpInSub* pEpInSub); -void* tDecodeSMqConsumerEpInSub(const void* buf, SMqConsumerEpInSub* pEpInSub); +SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp); +void tDeleteSMqConsumerEp(SMqConsumerEp* pEp); +int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp); +void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp); typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -529,9 +529,8 @@ typedef struct { int8_t withTbName; int8_t withSchema; int8_t withTag; - SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub - // TODO put -1 into unassignVgs - // SArray* unassignedVgs; + SHashObj* consumerHash; // consumerId -> SMqConsumerEp + SArray* unassignedVgs; // SArray } SMqSubscribeObj; SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); @@ -542,7 +541,7 @@ void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub); typedef struct { int32_t epoch; - SArray* consumers; // SArray + SArray* consumers; // SArray } SMqSubActionLogEntry; SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index ac75baeb35b38b3f938e9423053228b7aa610ff7..be584848a38477cafd0915a545b7eab19f3b168b 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -302,8 +302,8 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { mndReleaseTopic(pMnode, pTopic); // 2.2 iterate all vg assigned to the consumer of that topic - SMqConsumerEpInSub *pEpInSub = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); - int32_t vgNum = taosArrayGetSize(pEpInSub->vgs); + SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); + int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs); topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); if (topicEp.vgs == NULL) { @@ -313,7 +313,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { } for (int32_t j = 0; j < vgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j); + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); char offsetKey[TSDB_PARTITION_KEY_LEN]; mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId); // 2.2.1 build vg ep diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 767e59e4f6cf2931a1b028777e4af6421b7456a4..2f167b72d95ac730cda239840252276a39a0d2a4 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -211,42 +211,47 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { return (void *)buf; } -SMqConsumerEpInSub *tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub *pEpInSub) { - SMqConsumerEpInSub *pEpInSubNew = taosMemoryMalloc(sizeof(SMqConsumerEpInSub)); - if (pEpInSubNew == NULL) return NULL; - pEpInSubNew->consumerId = pEpInSub->consumerId; - pEpInSubNew->vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp); - return pEpInSubNew; +SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { + SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); + if (pConsumerEpNew == NULL) return NULL; + pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; + pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpOld->vgs, (FCopy)tCloneSMqVgEp); + return pConsumerEpNew; } -void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub *pEpInSub) { - taosArrayDestroyEx(pEpInSub->vgs, (FDelete)tDeleteSMqVgEp); +void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) { + // + taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); } -int32_t tEncodeSMqConsumerEpInSub(void **buf, const SMqConsumerEpInSub *pEpInSub) { +int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pEpInSub->consumerId); - int32_t sz = taosArrayGetSize(pEpInSub->vgs); + tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); + tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); +#if 0 + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, i); + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); tlen += tEncodeSMqVgEp(buf, pVgEp); } - /*tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);*/ +#endif return tlen; } -void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) { - buf = taosDecodeFixedI64(buf, &pEpInSub->consumerId); - /*buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));*/ +void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { + buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); + buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp)); +#if 0 int32_t sz; buf = taosDecodeFixedI32(buf, &sz); - pEpInSub->vgs = taosArrayInit(sz, sizeof(void *)); + pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); buf = tDecodeSMqVgEp(buf, pVgEp); - taosArrayPush(pEpInSub->vgs, &pVgEp); + taosArrayPush(pConsumerEp->vgs, &pVgEp); } +#endif return (void *)buf; } @@ -258,13 +263,11 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) { taosInitRWLatch(&pSubNew->lock); pSubNew->vgNum = 0; pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - // TODO set free fp - SMqConsumerEpInSub epInSub = { - .consumerId = -1, - .vgs = taosArrayInit(0, sizeof(void *)), - }; - int64_t unexistKey = -1; - taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub)); + // TODO set hash free fp + /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/ + + pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *)); + return pSubNew; } @@ -281,25 +284,27 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { pSubNew->vgNum = pSub->vgNum; pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - /*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/ - void *pIter = NULL; - SMqConsumerEpInSub *pEpInSub = NULL; + // TODO set hash free fp + /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/ + void *pIter = NULL; + SMqConsumerEp *pConsumerEp = NULL; while (1) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; - pEpInSub = (SMqConsumerEpInSub *)pIter; - SMqConsumerEpInSub newEp = { - .consumerId = pEpInSub->consumerId, - .vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp), + pConsumerEp = (SMqConsumerEp *)pIter; + SMqConsumerEp newEp = { + .consumerId = pConsumerEp->consumerId, + .vgs = taosArrayDeepCopy(pConsumerEp->vgs, (FCopy)tCloneSMqVgEp), }; - taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEpInSub)); + taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } + pSubNew->unassignedVgs = taosArrayDeepCopy(pSub->unassignedVgs, (FCopy)tCloneSMqVgEp); return pSubNew; } void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { - /*taosArrayDestroyEx(pSub->consumerEps, (FDelete)tDeleteSMqConsumerEpInSub);*/ taosHashCleanup(pSub->consumerHash); + taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); } int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { @@ -319,12 +324,12 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { while (1) { pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; - SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; - tlen += tEncodeSMqConsumerEpInSub(buf, pEpInSub); + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + tlen += tEncodeSMqConsumerEp(buf, pConsumerEp); cnt++; } ASSERT(cnt == sz); - /*tlen += taosEncodeArray(buf, pSub->consumerEps, (FEncode)tEncodeSMqConsumerEpInSub);*/ + tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); return tlen; } @@ -342,13 +347,12 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); for (int32_t i = 0; i < sz; i++) { - /*SMqConsumerEpInSub* pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));*/ - SMqConsumerEpInSub epInSub = {0}; - buf = tDecodeSMqConsumerEpInSub(buf, &epInSub); - taosHashPut(pSub->consumerHash, &epInSub.consumerId, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub)); + SMqConsumerEp consumerEp = {0}; + buf = tDecodeSMqConsumerEp(buf, &consumerEp); + taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)); } - /*buf = taosDecodeArray(buf, &pSub->consumerEps, (FDecode)tDecodeSMqConsumerEpInSub, sizeof(SMqConsumerEpInSub));*/ + buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); return (void *)buf; } @@ -356,12 +360,12 @@ SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); if (pEntryNew == NULL) return NULL; pEntryNew->epoch = pEntry->epoch; - pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEpInSub); + pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEp); return pEntryNew; } void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { - taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEpInSub); + taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); } int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { @@ -381,12 +385,12 @@ SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); if (pLogNew == NULL) return pLogNew; memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); - pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEpInSub); + pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEp); return pLogNew; } void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { - taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEpInSub); + taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); } int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 06aa3cec0705b5aa1cac8a3b49d140354d07e4eb..4976bdefc7a6128ffe13bb59d137c64693797536 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -504,11 +504,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib plan = nodesListGetNode(inner->pNodeList, 0); } - int64_t unexistKey = -1; - SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t)); - ASSERT(pEpInSub); - - ASSERT(taosHashGetSize(pSub->consumerHash) == 1); + ASSERT(pSub->unassignedVgs); + ASSERT(taosHashGetSize(pSub->consumerHash) == 0); void* pIter = NULL; while (1) { @@ -524,7 +521,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); pVgEp->vgId = pVgroup->vgId; - taosArrayPush(pEpInSub->vgs, &pVgEp); + taosArrayPush(pSub->unassignedVgs, &pVgEp); mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId); @@ -543,17 +540,11 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib } else { pVgEp->qmsg = strdup(""); } - - ASSERT(taosHashGetSize(pSub->consumerHash) == 1); - - /*taosArrayPush(pSub->unassignedVg, &consumerEp);*/ } - pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t)); - - ASSERT(pEpInSub->vgs->size > 0); + ASSERT(pSub->unassignedVgs->size > 0); - ASSERT(taosHashGetSize(pSub->consumerHash) == 1); + ASSERT(taosHashGetSize(pSub->consumerHash) == 0); qDestroyQueryPlan(pPlan); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 6a1994d7b86b28fdfed346338ce0254312554a7e..f271c1b5655e63794de5379efd65ee561edbfee5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -85,7 +85,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, pSub->withSchema = pTopic->withSchema; pSub->withTag = pTopic->withTag; - ASSERT(taosHashGetSize(pSub->consumerHash) == 1); + ASSERT(pSub->unassignedVgs->size == 0); + ASSERT(taosHashGetSize(pSub->consumerHash) == 0); if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { tDeleteSubscribeObj(pSub); @@ -93,7 +94,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, return NULL; } - ASSERT(taosHashGetSize(pSub->consumerHash) == 1); + ASSERT(pSub->unassignedVgs->size > 0); + ASSERT(taosHashGetSize(pSub->consumerHash) == 0); return pSub; } @@ -185,7 +187,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR if (pInput->pTopic != NULL) { // create subscribe pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key); - ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 1); + ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 0); } else { pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub); } @@ -196,21 +198,20 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0); // 2. check and get actual removed consumers, put their vg into hash int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; for (int32_t i = 0; i < removedNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); ASSERT(consumerId > 0); - SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); - ASSERT(pEpInSub); - if (pEpInSub) { - ASSERT(consumerId == pEpInSub->consumerId); + SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); + ASSERT(pConsumerEp); + if (pConsumerEp) { + ASSERT(consumerId == pConsumerEp->consumerId); actualRemoved++; - int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs); + int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j); + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SMqRebOutputVg outputVg = { .oldConsumerId = consumerId, .newConsumerId = -1, @@ -224,16 +225,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } } ASSERT(removedNum == actualRemoved); - ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0); // if previously no consumer, there are vgs not assigned { - int64_t unexistKey = -1; - SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t)); - ASSERT(pEpInSub); - int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs); + int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs); for (int32_t i = 0; i < consumerVgNum; i++) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs); + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs); SMqRebOutputVg rebOutput = { .oldConsumerId = -1, .newConsumerId = -1, @@ -246,7 +243,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR // 3. calc vg number of each consumer int32_t oldSz = 0; if (pInput->pOldSub) { - oldSz = taosHashGetSize(pInput->pOldSub->consumerHash) - 1; + oldSz = taosHashGetSize(pInput->pOldSub->consumerHash); } int32_t afterRebConsumerNum = oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers); @@ -264,23 +261,22 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; - SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; - if (pEpInSub->consumerId == -1) continue; - ASSERT(pEpInSub->consumerId > 0); - int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs); + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + ASSERT(pConsumerEp->consumerId > 0); + int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); // all old consumers still existing are touched // TODO optimize: touch only consumer whose vgs changed - taosArrayPush(pOutput->touchedConsumers, &pEpInSub->consumerId); + taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { if (imbCnt < imbConsumerNum) { if (consumerVgNum == minVgCnt + 1) { continue; } else { // pop until equal minVg + 1 - while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt + 1) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs); + while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); SMqRebOutputVg outputVg = { - .oldConsumerId = pEpInSub->consumerId, + .oldConsumerId = pConsumerEp->consumerId, .newConsumerId = -1, .pVgEp = pVgEp, }; @@ -290,10 +286,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } } else { // pop until equal minVg - while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs); + while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); SMqRebOutputVg outputVg = { - .oldConsumerId = pEpInSub->consumerId, + .oldConsumerId = pConsumerEp->consumerId, .newConsumerId = -1, .pVgEp = pVgEp, }; @@ -309,12 +305,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); ASSERT(consumerId > 0); - SMqConsumerEpInSub newConsumerEp; + SMqConsumerEp newConsumerEp; newConsumerEp.consumerId = consumerId; newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); - taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, - sizeof(SMqConsumerEpInSub)); - /*SMqConsumerEpInSub *pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/ + taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); + /*SMqConsumer* pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/ /*ASSERT(pTestNew->consumerId == consumerId);*/ /*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/ taosArrayPush(pOutput->newConsumers, &consumerId); @@ -329,25 +324,24 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR while (1) { pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; - SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; - if (pEpInSub->consumerId == -1) continue; - ASSERT(pEpInSub->consumerId > 0); + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + ASSERT(pConsumerEp->consumerId > 0); // push until equal minVg - while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) { + while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { // iter hash and find one vg pRemovedIter = taosHashIterate(pHash, pRemovedIter); ASSERT(pRemovedIter); pRebVg = (SMqRebOutputVg *)pRemovedIter; // push - taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp); - pRebVg->newConsumerId = pEpInSub->consumerId; + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); } } // 7. handle unassigned vg - if (taosHashGetSize(pOutput->pSub->consumerHash) != 1) { + if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { // if has consumer, assign all left vg while (1) { pRemovedIter = taosHashIterate(pHash, pRemovedIter); @@ -355,20 +349,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); ASSERT(pIter); pRebVg = (SMqRebOutputVg *)pRemovedIter; - SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; - if (pEpInSub->consumerId == -1) continue; - ASSERT(pEpInSub->consumerId > 0); - taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp); - pRebVg->newConsumerId = pEpInSub->consumerId; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + ASSERT(pConsumerEp->consumerId > 0); + taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); + pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); } } else { // if all consumer is removed, put all vg into unassigned - int64_t unexistKey = -1; - SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t)); - ASSERT(pEpInSub); - ASSERT(pEpInSub->consumerId == -1); - pIter = NULL; SMqRebOutputVg *pRebOutput = NULL; while (1) { @@ -376,7 +364,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR if (pIter == NULL) break; pRebOutput = (SMqRebOutputVg *)pIter; ASSERT(pRebOutput->newConsumerId == -1); - taosArrayPush(pEpInSub->vgs, &pRebOutput->pVgEp); + taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->rebVgs, pRebOutput); } } @@ -512,6 +500,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { // possibly no vg is changed /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ + // TODO replace assert with error check ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); if (rebInput.pTopic) { @@ -631,6 +620,10 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc pOldSub->consumerHash = pNewSub->consumerHash; pNewSub->consumerHash = tmp; + SArray *tmp1 = pOldSub->unassignedVgs; + pOldSub->unassignedVgs = pNewSub->unassignedVgs; + pNewSub->unassignedVgs = tmp1; + taosWUnLockLatch(&pOldSub->lock); return 0; }