提交 c790b4df 编写于 作者: L Liu Jicong

refacor(tmq): extract unassigned vg out of hash

上级 e119a7ad
......@@ -188,6 +188,7 @@ typedef struct SRequestSendRecvBody {
typedef struct {
int8_t resType;
int32_t code;
char topic[TSDB_TOPIC_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
......@@ -310,9 +311,8 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
#ifdef __cplusplus
}
......
......@@ -110,16 +110,23 @@ int taos_errno(TAOS_RES *tres) {
return terrno;
}
if (TD_RES_TMQ(tres)) {
return 0;
}
return ((SRequestObj *)tres)->code;
}
const char *taos_errstr(TAOS_RES *res) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest == NULL) {
if (res == NULL) {
return (const char *)tstrerror(terrno);
}
if (TD_RES_TMQ(res)) {
return "success";
}
SRequestObj *pRequest = (SRequestObj *)res;
if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
return pRequest->msgBuf;
} else {
......@@ -131,7 +138,7 @@ void taos_free_result(TAOS_RES *res) {
if (NULL == res) {
return;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
destroyRequest(pRequest);
......@@ -632,9 +639,7 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return stmtSetTbName(stmt, name);
}
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) {
return taos_stmt_set_tbname(stmt, name);
}
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
if (stmt == NULL || bind == NULL) {
......@@ -648,7 +653,7 @@ int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, -1);
}
......@@ -696,7 +701,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, in
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtBindBatch(stmt, bind, colIdx);
}
......@@ -750,9 +755,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
return stmtUseResult(stmt);
}
char *taos_stmt_errstr(TAOS_STMT *stmt) {
return (char *)stmtErrstr(stmt);
}
char *taos_stmt_errstr(TAOS_STMT *stmt) { return (char *)stmtErrstr(stmt); }
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
if (stmt == NULL) {
......
......@@ -514,12 +514,12 @@ void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
typedef struct {
int64_t consumerId; // -1 for unassigned
SArray* vgs; // SArray<SMqVgEp*>
} SMqConsumerEpInSub;
} SMqConsumerEp;
SMqConsumerEpInSub* tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub* pEpInSub);
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub* pEpInSub);
int32_t tEncodeSMqConsumerEpInSub(void** buf, const SMqConsumerEpInSub* pEpInSub);
void* tDecodeSMqConsumerEpInSub(const void* buf, SMqConsumerEpInSub* pEpInSub);
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
void tDeleteSMqConsumerEp(SMqConsumerEp* pEp);
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
......@@ -529,9 +529,8 @@ typedef struct {
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub
// TODO put -1 into unassignVgs
// SArray* unassignedVgs;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*>
} SMqSubscribeObj;
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
......@@ -542,7 +541,7 @@ void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub);
typedef struct {
int32_t epoch;
SArray* consumers; // SArray<SMqConsumerEpInSub*>
SArray* consumers; // SArray<SMqConsumerEp*>
} SMqSubActionLogEntry;
SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
......
......@@ -302,8 +302,8 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
mndReleaseTopic(pMnode, pTopic);
// 2.2 iterate all vg assigned to the consumer of that topic
SMqConsumerEpInSub *pEpInSub = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
int32_t vgNum = taosArrayGetSize(pEpInSub->vgs);
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
if (topicEp.vgs == NULL) {
......@@ -313,7 +313,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
}
for (int32_t j = 0; j < vgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j);
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
char offsetKey[TSDB_PARTITION_KEY_LEN];
mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
// 2.2.1 build vg ep
......
......@@ -211,42 +211,47 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
return (void *)buf;
}
SMqConsumerEpInSub *tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub *pEpInSub) {
SMqConsumerEpInSub *pEpInSubNew = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));
if (pEpInSubNew == NULL) return NULL;
pEpInSubNew->consumerId = pEpInSub->consumerId;
pEpInSubNew->vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp);
return pEpInSubNew;
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
if (pConsumerEpNew == NULL) return NULL;
pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpOld->vgs, (FCopy)tCloneSMqVgEp);
return pConsumerEpNew;
}
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub *pEpInSub) {
taosArrayDestroyEx(pEpInSub->vgs, (FDelete)tDeleteSMqVgEp);
void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) {
//
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
}
int32_t tEncodeSMqConsumerEpInSub(void **buf, const SMqConsumerEpInSub *pEpInSub) {
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pEpInSub->consumerId);
int32_t sz = taosArrayGetSize(pEpInSub->vgs);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
#if 0
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, i);
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
tlen += tEncodeSMqVgEp(buf, pVgEp);
}
/*tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);*/
#endif
return tlen;
}
void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) {
buf = taosDecodeFixedI64(buf, &pEpInSub->consumerId);
/*buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));*/
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));
#if 0
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pEpInSub->vgs = taosArrayInit(sz, sizeof(void *));
pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
buf = tDecodeSMqVgEp(buf, pVgEp);
taosArrayPush(pEpInSub->vgs, &pVgEp);
taosArrayPush(pConsumerEp->vgs, &pVgEp);
}
#endif
return (void *)buf;
}
......@@ -258,13 +263,11 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
taosInitRWLatch(&pSubNew->lock);
pSubNew->vgNum = 0;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set free fp
SMqConsumerEpInSub epInSub = {
.consumerId = -1,
.vgs = taosArrayInit(0, sizeof(void *)),
};
int64_t unexistKey = -1;
taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
// TODO set hash free fp
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *));
return pSubNew;
}
......@@ -281,25 +284,27 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
/*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/
void *pIter = NULL;
SMqConsumerEpInSub *pEpInSub = NULL;
// TODO set hash free fp
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
void *pIter = NULL;
SMqConsumerEp *pConsumerEp = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
pEpInSub = (SMqConsumerEpInSub *)pIter;
SMqConsumerEpInSub newEp = {
.consumerId = pEpInSub->consumerId,
.vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp),
pConsumerEp = (SMqConsumerEp *)pIter;
SMqConsumerEp newEp = {
.consumerId = pConsumerEp->consumerId,
.vgs = taosArrayDeepCopy(pConsumerEp->vgs, (FCopy)tCloneSMqVgEp),
};
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEpInSub));
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
}
pSubNew->unassignedVgs = taosArrayDeepCopy(pSub->unassignedVgs, (FCopy)tCloneSMqVgEp);
return pSubNew;
}
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
/*taosArrayDestroyEx(pSub->consumerEps, (FDelete)tDeleteSMqConsumerEpInSub);*/
taosHashCleanup(pSub->consumerHash);
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
}
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
......@@ -319,12 +324,12 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
tlen += tEncodeSMqConsumerEpInSub(buf, pEpInSub);
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
cnt++;
}
ASSERT(cnt == sz);
/*tlen += taosEncodeArray(buf, pSub->consumerEps, (FEncode)tEncodeSMqConsumerEpInSub);*/
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
return tlen;
}
......@@ -342,13 +347,12 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
for (int32_t i = 0; i < sz; i++) {
/*SMqConsumerEpInSub* pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));*/
SMqConsumerEpInSub epInSub = {0};
buf = tDecodeSMqConsumerEpInSub(buf, &epInSub);
taosHashPut(pSub->consumerHash, &epInSub.consumerId, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
SMqConsumerEp consumerEp = {0};
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
}
/*buf = taosDecodeArray(buf, &pSub->consumerEps, (FDecode)tDecodeSMqConsumerEpInSub, sizeof(SMqConsumerEpInSub));*/
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
return (void *)buf;
}
......@@ -356,12 +360,12 @@ SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
if (pEntryNew == NULL) return NULL;
pEntryNew->epoch = pEntry->epoch;
pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEpInSub);
pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEp);
return pEntryNew;
}
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEpInSub);
taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
}
int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
......@@ -381,12 +385,12 @@ SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
if (pLogNew == NULL) return pLogNew;
memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEpInSub);
pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEp);
return pLogNew;
}
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEpInSub);
taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
}
int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
......
......@@ -504,11 +504,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
plan = nodesListGetNode(inner->pNodeList, 0);
}
int64_t unexistKey = -1;
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
ASSERT(pSub->unassignedVgs);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
void* pIter = NULL;
while (1) {
......@@ -524,7 +521,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pVgEp->vgId = pVgroup->vgId;
taosArrayPush(pEpInSub->vgs, &pVgEp);
taosArrayPush(pSub->unassignedVgs, &pVgEp);
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
......@@ -543,17 +540,11 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
} else {
pVgEp->qmsg = strdup("");
}
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
}
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub->vgs->size > 0);
ASSERT(pSub->unassignedVgs->size > 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
qDestroyQueryPlan(pPlan);
......
......@@ -85,7 +85,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub->withSchema = pTopic->withSchema;
pSub->withTag = pTopic->withTag;
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
ASSERT(pSub->unassignedVgs->size == 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
tDeleteSubscribeObj(pSub);
......@@ -93,7 +94,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
return NULL;
}
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
ASSERT(pSub->unassignedVgs->size > 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
return pSub;
}
......@@ -185,7 +187,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (pInput->pTopic != NULL) {
// create subscribe
pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 1);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 0);
} else {
pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub);
}
......@@ -196,21 +198,20 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0);
// 2. check and get actual removed consumers, put their vg into hash
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0;
for (int32_t i = 0; i < removedNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
ASSERT(consumerId > 0);
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
ASSERT(pEpInSub);
if (pEpInSub) {
ASSERT(consumerId == pEpInSub->consumerId);
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
ASSERT(pConsumerEp);
if (pConsumerEp) {
ASSERT(consumerId == pConsumerEp->consumerId);
actualRemoved++;
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j);
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SMqRebOutputVg outputVg = {
.oldConsumerId = consumerId,
.newConsumerId = -1,
......@@ -224,16 +225,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}
ASSERT(removedNum == actualRemoved);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0);
// if previously no consumer, there are vgs not assigned
{
int64_t unexistKey = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub);
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs);
for (int32_t i = 0; i < consumerVgNum; i++) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
SMqRebOutputVg rebOutput = {
.oldConsumerId = -1,
.newConsumerId = -1,
......@@ -246,7 +243,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// 3. calc vg number of each consumer
int32_t oldSz = 0;
if (pInput->pOldSub) {
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash) - 1;
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash);
}
int32_t afterRebConsumerNum =
oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers);
......@@ -264,23 +261,22 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
if (pEpInSub->consumerId == -1) continue;
ASSERT(pEpInSub->consumerId > 0);
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
// all old consumers still existing are touched
// TODO optimize: touch only consumer whose vgs changed
taosArrayPush(pOutput->touchedConsumers, &pEpInSub->consumerId);
taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
if (consumerVgNum > minVgCnt) {
if (imbCnt < imbConsumerNum) {
if (consumerVgNum == minVgCnt + 1) {
continue;
} else {
// pop until equal minVg + 1
while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt + 1) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
SMqRebOutputVg outputVg = {
.oldConsumerId = pEpInSub->consumerId,
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
......@@ -290,10 +286,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
} else {
// pop until equal minVg
while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
SMqRebOutputVg outputVg = {
.oldConsumerId = pEpInSub->consumerId,
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
......@@ -309,12 +305,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
ASSERT(consumerId > 0);
SMqConsumerEpInSub newConsumerEp;
SMqConsumerEp newConsumerEp;
newConsumerEp.consumerId = consumerId;
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp,
sizeof(SMqConsumerEpInSub));
/*SMqConsumerEpInSub *pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
/*SMqConsumer* pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
/*ASSERT(pTestNew->consumerId == consumerId);*/
/*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/
taosArrayPush(pOutput->newConsumers, &consumerId);
......@@ -329,25 +324,24 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
if (pEpInSub->consumerId == -1) continue;
ASSERT(pEpInSub->consumerId > 0);
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
// push until equal minVg
while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) {
while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
// push
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
}
// 7. handle unassigned vg
if (taosHashGetSize(pOutput->pSub->consumerHash) != 1) {
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
// if has consumer, assign all left vg
while (1) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
......@@ -355,20 +349,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
ASSERT(pIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
if (pEpInSub->consumerId == -1) continue;
ASSERT(pEpInSub->consumerId > 0);
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
} else {
// if all consumer is removed, put all vg into unassigned
int64_t unexistKey = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t));
ASSERT(pEpInSub);
ASSERT(pEpInSub->consumerId == -1);
pIter = NULL;
SMqRebOutputVg *pRebOutput = NULL;
while (1) {
......@@ -376,7 +364,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (pIter == NULL) break;
pRebOutput = (SMqRebOutputVg *)pIter;
ASSERT(pRebOutput->newConsumerId == -1);
taosArrayPush(pEpInSub->vgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput);
}
}
......@@ -512,6 +500,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
// possibly no vg is changed
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
// TODO replace assert with error check
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0);
if (rebInput.pTopic) {
......@@ -631,6 +620,10 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
pOldSub->consumerHash = pNewSub->consumerHash;
pNewSub->consumerHash = tmp;
SArray *tmp1 = pOldSub->unassignedVgs;
pOldSub->unassignedVgs = pNewSub->unassignedVgs;
pNewSub->unassignedVgs = tmp1;
taosWUnLockLatch(&pOldSub->lock);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册