提交 b7bf0fe1 编写于 作者: L Liu Jicong

refactor(tmq): subscribe and rebalance process

上级 69cb4b5c
......@@ -329,7 +329,7 @@ typedef struct SEpSet {
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
typedef struct {
int8_t connType;
......@@ -1290,10 +1290,14 @@ typedef struct {
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
typedef struct {
int64_t consumerId;
} SMqConsumerLostMsg;
typedef struct {
int32_t topicNum;
int64_t consumerId;
char* consumerGroup;
char cgroup[TSDB_CGROUP_LEN];
SArray* topicNames; // SArray<char*>
} SCMSubscribeReq;
......@@ -1301,7 +1305,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->consumerGroup);
tlen += taosEncodeString(buf, pReq->cgroup);
for (int32_t i = 0; i < pReq->topicNum; i++) {
tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
......@@ -1312,7 +1316,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeString(buf, &pReq->consumerGroup);
buf = taosDecodeStringTo(buf, pReq->cgroup);
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
for (int32_t i = 0; i < pReq->topicNum; i++) {
char* name;
......@@ -1388,10 +1392,10 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
}
typedef struct {
const char* key;
SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t>
char key[TSDB_SUBSCRIBE_KEY_LEN];
SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t>
} SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
......@@ -1399,7 +1403,7 @@ static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
if (pRebSub == NULL) {
goto _err;
}
pRebSub->key = strdup(key);
strcpy(pRebSub->key, key);
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->lostConsumers == NULL) {
goto _err;
......@@ -1424,6 +1428,7 @@ _err:
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization
typedef struct {
int8_t* mqInReb;
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
......@@ -1876,6 +1881,40 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return buf;
}
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
char* qmsg;
} SMqRebVgReq;
static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->subKey);
tlen += taosEncodeString(buf, pReq->qmsg);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->subKey);
buf = taosDecodeString(buf, &pReq->qmsg);
return (void*)buf;
}
typedef struct {
int8_t reserved;
} SMqRebVgRsp;
typedef struct {
int64_t leftForVer;
int32_t vgId;
......@@ -2441,7 +2480,10 @@ typedef struct {
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
taosMemoryFree(pSubTopicEp->schema.pSchema);
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
......
......@@ -149,6 +149,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
......@@ -180,6 +181,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_REB, "vnode-mq-mv-rebalance", SMqMVRebReq, SMqMVRebRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CANCEL_CONN, "vnode-mq-mv-cancel-conn", SMqCancelConnReq, SMqCancelConnRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
......
......@@ -92,7 +92,13 @@ extern "C" {
typedef struct SMnode SMnode;
typedef struct SSdbRaw SSdbRaw;
typedef struct SSdbRow SSdbRow;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum {
SDB_KEY_BINARY = 1,
SDB_KEY_INT32 = 2,
SDB_KEY_INT64 = 3,
} EKeyType;
typedef enum {
SDB_STATUS_INIT = 0,
SDB_STATUS_CREATING = 1,
......
......@@ -27,6 +27,11 @@ extern "C" {
typedef int32_t (*__compar_fn_t)(const void *, const void *);
#endif
typedef void *(*FCopy)(void *);
typedef void (*FDelete)(void *);
typedef int32_t (*FEncode)(void **buf, const void *dst);
typedef void *(*FDecode)(const void *buf, void *dst);
#define TD_EQ 0x1
#define TD_GT 0x2
#define TD_LT 0x4
......
......@@ -279,6 +279,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EB)
// mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
......
......@@ -41,10 +41,10 @@ extern "C" {
#define TARRAY_GET_START(array) ((array)->pData)
typedef struct SArray {
size_t size;
size_t size;
uint32_t capacity;
uint32_t elemSize;
void* pData;
void* pData;
} SArray;
/**
......@@ -200,23 +200,20 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize);
SArray* taosArrayDup(const SArray* pSrc);
/**
* clear the array (remove all element)
* @param pArray
* deep copy a new array
* @param pSrc
*/
void taosArrayClear(SArray* pArray);
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy);
/**
* destroy array list
* clear the array (remove all element)
* @param pArray
*/
void* taosArrayDestroy(SArray* pArray);
void taosArrayClear(SArray* pArray);
/**
*
* @param pArray
* @param fp
*/
void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*));
void* taosArrayDestroy(SArray* pArray);
void taosArrayDestroyP(SArray* pArray, FDelete fp);
void taosArrayDestroyEx(SArray* pArray, FDelete fp);
/**
* sort the array
......@@ -264,6 +261,9 @@ char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param);
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
#ifdef __cplusplus
}
#endif
......
......@@ -255,7 +255,12 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
void tmq_list_destroy(tmq_list_t* list) {
SArray* container = &list->container;
/*taosArrayDestroy(container);*/
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
int32_t sz = taosArrayGetSize(container);
for (int32_t i = 0; i < sz; i++) {
char* str = taosArrayGetP(container, i);
taosMemoryFree(str);
}
taosArrayDestroy(container);
}
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
......@@ -496,7 +501,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SCMSubscribeReq req;
req.topicNum = sz;
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
strcpy(req.cgroup, tmq->groupId);
req.topicNames = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) {
......
......@@ -121,14 +121,14 @@ int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
return tlen;
}
void *taosDecodeSEpSet(void *buf, SEpSet *pEp) {
void *taosDecodeSEpSet(const void *buf, SEpSet *pEp) {
buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port);
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn);
}
return buf;
return (void *)buf;
}
static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) {
......@@ -2184,7 +2184,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->type) < 0) return -1;
// if (tEncodeI8(&encoder, pReq->free) < 0) return -1;
// if (tEncodeI8(&encoder, pReq->free) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1;
tEndEncode(&encoder);
......@@ -2201,7 +2201,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->type) < 0) return -1;
// if (tDecodeI8(&decoder, &pReq->free) < 0) return -1;
// if (tDecodeI8(&decoder, &pReq->free) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1;
tEndDecode(&decoder);
......
......@@ -22,27 +22,30 @@
extern "C" {
#endif
enum {
MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__ACTIVE,
// MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__MODIFY = 1,
MQ_CONSUMER_STATUS__MODIFY_IN_REB,
// MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__READY,
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__MODIFY
MQ_CONSUMER_STATUS__LOST_IN_REB,
MQ_CONSUMER_STATUS__LOST_REBD,
};
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup);
SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup);
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
#ifdef __cplusplus
}
#endif
......
......@@ -88,6 +88,7 @@ typedef enum {
TRN_TYPE_CREATE_STREAM = 1019,
TRN_TYPE_DROP_STREAM = 1020,
TRN_TYPE_ALTER_STREAM = 1021,
TRN_TYPE_CONSUMER_LOST = 1022,
TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001,
......@@ -506,6 +507,7 @@ static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset)
return buf;
}
#if 0
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t status;
......@@ -636,6 +638,7 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
pSub->unassignedVg = NULL;
}
}
#endif
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
......@@ -654,6 +657,7 @@ typedef struct {
SSchemaWrapper schema;
} SMqTopicObj;
#if 0
typedef struct {
int64_t consumerId;
int64_t connId;
......@@ -708,7 +712,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->currentTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
pConsumer->currentTopics = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
......@@ -716,7 +720,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
}
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
......@@ -724,6 +728,132 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
}
return buf;
}
#endif
enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__MODIFY,
};
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
// hbStatus is not applicable to serialization
int32_t hbStatus;
// lock is used for topics update
SRWLatch lock;
SArray* currentTopics; // SArray<char*>
#if 0
SArray* waitingRebTopics; // SArray<char*>
#endif
SArray* rebNewTopics; // SArray<char*>
SArray* rebRemovedTopics; // SArray<char*>
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer);
typedef struct {
int32_t vgId;
char* qmsg;
// char topic[TSDB_TOPIC_FNAME_LEN];
SEpSet epSet;
} SMqVgEp;
SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
void tDeleteSMqVgEp(SMqVgEp* pVgEp);
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
typedef struct {
int64_t consumerId; // -1 for unassigned
SArray* vgs; // SArray<SMqVgEp*>
} SMqConsumerEpInSub;
SMqConsumerEpInSub* tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub* pEpInSub);
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub* pEpInSub);
int32_t tEncodeSMqConsumerEpInSub(void** buf, const SMqConsumerEpInSub* pEpInSub);
void* tDecodeSMqConsumerEpInSub(const void* buf, SMqConsumerEpInSub* pEpInSub);
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
SRWLatch lock;
int32_t vgNum;
SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub
} SMqSubscribeObj;
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub);
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub);
typedef struct {
int32_t epoch;
SArray* consumers; // SArray<SMqConsumerEpInSub*>
} SMqSubActionLogEntry;
SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
SArray* logs; // SArray<SMqSubActionLogEntry*>
} SMqSubActionLogObj;
SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
SRWLatch lock;
SArray* vgs; // SArray<SMqVgEp*>
} SMqConsumerEpObj;
SMqConsumerEpObj* tCloneSMqConsumerEpObj(const SMqConsumerEpObj* pConsumerEp);
void tDeleteSMqConsumerEpObj(SMqConsumerEpObj* pConsumerEp);
int32_t tEncodeSMqConsumerEpObj(void** buf, const SMqConsumerEpObj* pConsumerEp);
void* tDecodeSMqConsumerEpObj(const void* buf, SMqConsumerEpObj* pConsumerEp);
typedef struct {
const SMqSubscribeObj* pOldSub;
const SMqTopicObj* pTopic;
const SMqRebSubscribe* pRebInfo;
} SMqRebInputObj;
typedef struct {
int64_t oldConsumerId;
int64_t newConsumerId;
SMqVgEp* pVgEp;
} SMqRebOutputVg;
#if 0
typedef struct {
int64_t consumerId;
} SMqRebOutputConsumer;
#endif
typedef struct {
SArray* rebVgs; // SArray<SMqRebOutputVg>
SArray* newConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* touchedConsumers; // SArray<int64_t>
SMqSubscribeObj* pSub;
SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
......
......@@ -26,9 +26,11 @@ int32_t mndInitSubscribe(SMnode *pMnode);
void mndCleanupSubscribe(SMnode *pMnode);
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char* key);
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key);
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
#ifdef __cplusplus
}
#endif
......
......@@ -19,8 +19,10 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
......@@ -28,9 +30,13 @@
#include "tcompare.h"
#include "tname.h"
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT 3
static int8_t mqInRebFlag = 0;
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
......@@ -38,6 +44,11 @@ static int32_t mndProcessConsumerMetaMsg(SNodeMsg *pMsg);
static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);
static int32_t mndProcessAskEpReq(SNodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);
static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg);
int32_t mndInitConsumer(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CONSUMER,
.keyType = SDB_KEY_INT64,
......@@ -47,25 +58,392 @@ int32_t mndInitConsumer(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupConsumer(SMnode *pMnode) {}
SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SMqConsumerLostMsg *pLostMsg = pMsg->rpcMsg.pCont;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
ASSERT(pConsumer);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__LOST;
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CONSUMER_LOST, &pMsg->rpcMsg);
if (pTrans == NULL) goto FAIL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
mndTransDrop(pTrans);
return 0;
FAIL:
// TODO delete consumer
mndTransDrop(pTrans);
return -1;
}
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe));
}
return pRebSub;
}
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SSdb *pSdb = pMnode->pSdb;
SMqConsumerObj *pConsumer;
void *pIter = NULL;
// rebalance cannot be parallel
int8_t old = atomic_val_compare_exchange_8(&mqInRebFlag, 0, 1);
if (old != 0) {
mInfo("mq rebalance already in progress, do nothing");
return 0;
}
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
// TODO set cleanfp
pRebMsg->mqInReb = &mqInRebFlag;
// iterate all consumers, find all modification
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
taosRLockLatch(&pConsumer->lock);
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__READY && hbStatus > MND_CONSUMER_LOST_HB_CNT) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg *pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg));
pRpcMsg->msgType = TDMT_MND_MQ_CONSUMER_LOST;
pRpcMsg->pCont = pLostMsg;
pRpcMsg->contLen = sizeof(SMqConsumerLostMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, pRpcMsg);
}
if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) {
// do nothing
} else if (status == MQ_CONSUMER_STATUS__LOST) {
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < newTopicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
}
int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < removedTopicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
} else {
// do nothing
}
taosRUnLockLatch(&pConsumer->lock);
mndReleaseConsumer(pMnode, pConsumer);
}
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq rebalance will be triggered");
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_MQ_DO_REBALANCE,
.pCont = pRebMsg,
.contLen = sizeof(SMqDoRebalanceMsg),
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
mInfo("mq rebalance finished, no modification");
atomic_store_8(&mqInRebFlag, 0);
}
return 0;
}
static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp = {0};
int64_t consumerId = be64toh(pReq->consumerId);
int32_t epoch = ntohl(pReq->epoch);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId);
if (pConsumer == NULL) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
/*int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);*/
atomic_store_32(&pConsumer->hbStatus, 0);
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
if (status == MQ_CONSUMER_STATUS__LOST) {
// recover consumer
}
if (status != MQ_CONSUMER_STATUS__READY) {
mndReleaseConsumer(pMnode, pConsumer);
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
return -1;
}
int32_t serverEpoch = atomic_load_32(&pConsumer->epoch);
// 2. check epoch, only send ep info when epoches do not match
if (epoch != serverEpoch) {
taosRLockLatch(&pConsumer->lock);
int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics);
rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp));
if (rsp.topics == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
// handle all topic subscribed by the consumer
for (int32_t i = 0; i < numOfTopics; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created
ASSERT(pSub);
taosRLockLatch(&pSub->lock);
SMqSubTopicEp topicEp = {0};
strcpy(topicEp.topic, topic);
// 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic);
taosRLockLatch(&pTopic->lock);
topicEp.schema.nCols = pTopic->schema.nCols;
topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema));
memcpy(topicEp.schema.pSchema, pTopic->schema.pSchema, topicEp.schema.nCols * sizeof(SSchema));
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
// 2.2 iterate all vg assigned to the consumer of that topic
SMqConsumerEpInSub *pEpInSub = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
int32_t vgNum = taosArrayGetSize(pEpInSub->vgs);
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
if (topicEp.vgs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
for (int32_t j = 0; j < vgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j);
char offsetKey[TSDB_PARTITION_KEY_LEN];
mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
// 2.2.1 build vg ep
SMqSubVgEp vgEp = {
.epSet = pVgEp->epSet,
.vgId = pVgEp->vgId,
.offset = -1,
};
// 2.2.2 fetch vg offset
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
if (pOffsetObj != NULL) {
vgEp.offset = atomic_load_64(&pOffsetObj->offset);
mndReleaseOffset(pMnode, pOffsetObj);
}
taosArrayPush(topicEp.vgs, &vgEp);
}
taosArrayPush(rsp.topics, &topicEp);
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
taosRUnLockLatch(&pConsumer->lock);
}
// encode rsp
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return -1;
}
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = serverEpoch;
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
pConsumer->recentRemovedTopics = taosArrayInit(1, sizeof(char *));
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT);
strcpy(pConsumer->cgroup, cgroup);
taosInitRWLatch(&pConsumer->lock);
return pConsumer;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
// release consumer and free memory
tDeleteSMqCMGetSubEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
// send rsp
pMsg->pRsp = buf;
pMsg->rspLen = tlen;
return 0;
FAIL:
tDeleteSMqCMGetSubEpRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer) {
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.cgroup;
SMqConsumerObj *pConsumerOld = NULL;
SMqConsumerObj *pConsumerNew = NULL;
int32_t code = -1;
SArray *newSub = subscribe.topicNames;
taosArraySortString(newSub, taosArrayCompareString);
int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance
for (int32_t i = 0; i < newTopicNum; i++) {
char *topic = taosArrayGetP(newSub, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
goto SUBSCRIBE_OVER;
}
// TODO lock topic to prevent drop
mndReleaseTopic(pMnode, pTopic);
}
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) {
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
/*pConsumerNew->waitingRebTopics = newSub;*/
pConsumerNew->rebNewTopics = newSub;
subscribe.topicNames = NULL;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
} else {
taosRLockLatch(&pConsumerOld->lock);
int32_t status = atomic_load_32(&pConsumerOld->status);
if (status != MQ_CONSUMER_STATUS__READY) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto SUBSCRIBE_OVER;
}
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
if (pConsumerNew == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto SUBSCRIBE_OVER;
}
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
/*pConsumerOld->waitingRebTopics = newSub;*/
int32_t oldTopicNum = 0;
if (pConsumerOld->currentTopics) {
oldTopicNum = taosArrayGetSize(pConsumerOld->currentTopics);
}
int32_t i = 0, j = 0;
while (i < oldTopicNum || j < newTopicNum) {
if (i >= oldTopicNum) {
char *newTopicCopy = strdup(taosArrayGetP(newSub, j));
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
j++;
continue;
} else if (j >= newTopicNum) {
char *oldTopicCopy = strdup(taosArrayGetP(pConsumerOld->currentTopics, i));
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
i++;
continue;
} else {
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
char *newTopic = taosArrayGetP(newSub, j);
int comp = compareLenPrefixedStr(oldTopic, newTopic);
if (comp == 0) {
i++;
j++;
continue;
} else if (comp < 0) {
char *oldTopicCopy = strdup(oldTopic);
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
i++;
continue;
} else {
char *newTopicCopy = strdup(newTopic);
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
j++;
continue;
}
}
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
}
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
SUBSCRIBE_OVER:
if (pConsumerOld) {
taosRUnLockLatch(&pConsumerOld->lock);
mndReleaseConsumer(pMnode, pConsumerOld);
}
if (pConsumerNew) {
tDeleteSMqConsumerObj(pConsumerNew);
}
// TODO: replace with destroy subscribe msg
if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
return code;
}
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
......@@ -154,15 +532,141 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId);
tDeleteSMqConsumerObj(pConsumer);
return 0;
}
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
/*taosWLockLatch(&pOldConsumer->lock);*/
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
/*taosWUnLockLatch(&pOldConsumer->lock);*/
taosWLockLatch(&pOldConsumer->lock);
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
SArray *tmp = pOldConsumer->rebNewTopics;
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
pNewConsumer->rebNewTopics = tmp;
tmp = pOldConsumer->rebRemovedTopics;
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
pNewConsumer->rebRemovedTopics = tmp;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
pOldConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic = strdup(taosArrayGetP(pOldConsumer->currentTopics, i));
taosArrayPush(pNewConsumer->rebRemovedTopics, &topic);
}
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
char *addedTopic = strdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic
#if 1
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->currentTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
ASSERT(strcmp(topic, addedTopic) != 0);
}
#endif
// remove from new topic
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
if (strcmp(addedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->rebNewTopics, i);
taosMemoryFree(topic);
break;
}
}
// add to current topic
taosArrayPush(pOldConsumer->currentTopics, &addedTopic);
taosArraySortString(pOldConsumer->currentTopics, taosArrayCompareString);
// set status
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
}
}
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
// not exist in new topic
#if 1
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
ASSERT(strcmp(topic, removedTopic) != 0);
}
#endif
// remove from removed topic
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
if (strcmp(removedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->rebNewTopics, i);
taosMemoryFree(topic);
break;
}
}
// remove from current topic
int32_t i = 0;
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
for (i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(removedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->currentTopics, i);
taosMemoryFree(topic);
break;
}
}
// must find the topic
ASSERT(i < sz);
// set status
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB ||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY ||
pOldConsumer->status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (pOldConsumer->status == MQ_CONSUMER_STATUS__LOST ||
pOldConsumer->status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
}
}
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
}
taosWUnLockLatch(&pOldConsumer->lock);
return 0;
}
......
......@@ -14,6 +14,386 @@
*/
#include "mndDef.h"
#include "mndConsumer.h"
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
return NULL;
}
pConsumer->consumerId = consumerId;
memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
pConsumer->epoch = 0;
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
pConsumer->hbStatus = 0;
taosInitRWLatch(&pConsumer->lock);
pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
#if 0
pConsumer->waitingRebTopics = NULL;
#endif
pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL) {
taosArrayDestroy(pConsumer->currentTopics);
taosArrayDestroy(pConsumer->rebNewTopics);
taosArrayDestroy(pConsumer->rebRemovedTopics);
taosMemoryFree(pConsumer);
return NULL;
}
return pConsumer;
}
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
if (pConsumer->currentTopics) {
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
}
#if 0
if (pConsumer->waitingRebTopics) {
taosArrayDestroyP(pConsumer->waitingRebTopics, taosMemoryFree);
}
#endif
if (pConsumer->rebNewTopics) {
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
}
if (pConsumer->rebRemovedTopics) {
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
}
}
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
int32_t tlen = 0;
int32_t sz;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeString(buf, pConsumer->cgroup);
tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
tlen += taosEncodeFixedI32(buf, pConsumer->status);
// current topics
if (pConsumer->currentTopics) {
sz = taosArrayGetSize(pConsumer->currentTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
#if 0
// waiting reb topics
if (pConsumer->waitingRebTopics) {
sz = taosArrayGetSize(pConsumer->waitingRebTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->waitingRebTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
#endif
// reb new topics
if (pConsumer->rebNewTopics) {
sz = taosArrayGetSize(pConsumer->rebNewTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
// reb removed topics
if (pConsumer->rebRemovedTopics) {
sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
return tlen;
}
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
buf = taosDecodeFixedI32(buf, &pConsumer->status);
// current topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->currentTopics, &topic);
}
#if 0
// waiting reb topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->waitingRebTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->waitingRebTopics, &topic);
}
#endif
// reb new topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->rebNewTopics, &topic);
}
// reb removed topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->rebRemovedTopics, &topic);
}
return (void *)buf;
}
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL;
pVgEpNew->vgId = pVgEp->vgId;
pVgEpNew->qmsg = strdup(pVgEp->qmsg);
/*memcpy(pVgEpNew->topic, pVgEp->topic, TSDB_TOPIC_FNAME_LEN);*/
pVgEpNew->epSet = pVgEp->epSet;
return pVgEpNew;
}
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); }
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeString(buf, pVgEp->qmsg);
/*tlen += taosEncodeString(buf, pVgEp->topic);*/
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeString(buf, &pVgEp->qmsg);
/*buf = taosDecodeStringTo(buf, pVgEp->topic);*/
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return (void *)buf;
}
SMqConsumerEpObj *tCloneSMqConsumerEpObj(const SMqConsumerEpObj *pConsumerEp) {
SMqConsumerEpObj *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEpObj));
if (pConsumerEpNew == NULL) return NULL;
pConsumerEpNew->consumerId = pConsumerEp->consumerId;
memcpy(pConsumerEpNew->cgroup, pConsumerEp->cgroup, TSDB_CGROUP_LEN);
taosInitRWLatch(&pConsumerEpNew->lock);
pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpNew->vgs, (FCopy)tCloneSMqVgEp);
return pConsumerEpNew;
}
void tDeleteSMqConsumerEpObj(SMqConsumerEpObj *pConsumerEp) {
taosArrayDestroyEx(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
}
int32_t tEncodeSMqConsumerEpObj(void **buf, const SMqConsumerEpObj *pConsumerEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += taosEncodeString(buf, pConsumerEp->cgroup);
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
return tlen;
}
void *tDecodeSMqConsumerEpObj(const void *buf, SMqConsumerEpObj *pConsumerEp) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeStringTo(buf, pConsumerEp->cgroup);
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));
return (void *)buf;
}
SMqConsumerEpInSub *tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub *pEpInSub) {
SMqConsumerEpInSub *pEpInSubNew = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));
if (pEpInSubNew == NULL) return NULL;
pEpInSubNew->consumerId = pEpInSub->consumerId;
pEpInSubNew->vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp);
return pEpInSubNew;
}
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub *pEpInSub) {
taosArrayDestroyEx(pEpInSub->vgs, (FDelete)tDeleteSMqVgEp);
}
int32_t tEncodeSMqConsumerEpInSub(void **buf, const SMqConsumerEpInSub *pEpInSub) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pEpInSub->consumerId);
tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);
return tlen;
}
void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) {
buf = taosDecodeFixedI64(buf, &pEpInSub->consumerId);
buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));
return (void *)buf;
}
SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL;
memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock);
pSubNew->vgNum = -1;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// TODO set free fp
SMqConsumerEpInSub *pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));
pEpInSub->vgs = taosArrayInit(0, sizeof(SMqVgEp));
int64_t unexistKey = -1;
taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), pEpInSub, sizeof(SMqConsumerEpInSub));
return pSubNew;
}
SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
if (pSubNew == NULL) return NULL;
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock);
pSubNew->vgNum = pSub->vgNum;
/*pSubNew->consumerEps = taosArrayDeepCopy(pSub->consumerEps, (FCopy)tCloneSMqConsumerEpInSub);*/
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
/*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/
void *pIter = NULL;
SMqConsumerEpInSub *pEpInSub = NULL;
while (1) {
pIter = taosHashIterate(pSubNew->consumerHash, pIter);
if (pIter == NULL) break;
pEpInSub = (SMqConsumerEpInSub *)pIter;
SMqConsumerEpInSub newEp = {
.consumerId = pEpInSub->consumerId,
.vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp),
};
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEpInSub));
}
return pSubNew;
}
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
/*taosArrayDestroyEx(pSub->consumerEps, (FDelete)tDeleteSMqConsumerEpInSub);*/
taosHashCleanup(pSub->consumerHash);
}
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pSub->key);
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
void *pIter = NULL;
int32_t sz = taosHashGetSize(pSub->consumerHash);
tlen += taosEncodeFixedI32(buf, sz);
int32_t cnt = 0;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
tlen += tEncodeSMqConsumerEpInSub(buf, pEpInSub);
cnt++;
}
ASSERT(cnt == sz);
/*tlen += taosEncodeArray(buf, pSub->consumerEps, (FEncode)tEncodeSMqConsumerEpInSub);*/
return tlen;
}
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
//
buf = taosDecodeStringTo(buf, pSub->key);
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
for (int32_t i = 0; i < sz; i++) {
/*SMqConsumerEpInSub* pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));*/
SMqConsumerEpInSub epInSub = {0};
buf = tDecodeSMqConsumerEpInSub(buf, &epInSub);
taosHashPut(pSub->consumerHash, &epInSub.consumerId, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
}
/*buf = taosDecodeArray(buf, &pSub->consumerEps, (FDecode)tDecodeSMqConsumerEpInSub, sizeof(SMqConsumerEpInSub));*/
return (void *)buf;
}
SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
if (pEntryNew == NULL) return NULL;
pEntryNew->epoch = pEntry->epoch;
pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEpInSub);
return pEntryNew;
}
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEpInSub);
}
int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pEntry->epoch);
tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
return tlen;
}
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
buf = taosDecodeFixedI32(buf, &pEntry->epoch);
buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
return (void *)buf;
}
SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
if (pLogNew == NULL) return pLogNew;
memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEpInSub);
return pLogNew;
}
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEpInSub);
}
int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pLog->key);
tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
return tlen;
}
void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
buf = taosDecodeStringTo(buf, pLog->key);
buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
return (void *)buf;
}
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
......
......@@ -453,6 +453,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
}
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
int64_t unexistKey = -1;
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
......@@ -466,24 +469,33 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
pVgEp->epSet = plan->execNode.epSet;
pVgEp->vgId = plan->execNode.nodeId;
#if 0
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId;
mDebug("init subscribption %s, assign vg: %d", pSub->key, consumerEp.vgId);
#endif
mDebug("init subscribption %s, assign vg: %d", pSub->key, pVgEp->vgId);
int32_t msgLen;
if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(pSub->unassignedVg, &consumerEp);
taosArrayPush(pEpInSub->vgs, &pVgEp);
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
}
taosHashRelease(pSub->consumerHash, pEpInSub);
qDestroyQueryPlan(pPlan);
return 0;
......
......@@ -40,30 +40,47 @@ enum {
MQ_SUBSCRIBE_STATUS__DELETED,
};
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg);
/*static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);*/
/*static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg);*/
static int32_t mndProcessSubscribeInternalReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg);
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg);
static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
/*static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);*/
/*static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg);*/
/*static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg);*/
/*static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);*/
static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg);
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp);
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
const char *topicName);
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,
const char *oldTopicName);
/*static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,*/
/*const SMqConsumerEp *pConsumerEp);*/
/*static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,*/
/*const char *topicName);*/
/*static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,*/
/*const char *oldTopicName);*/
int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
......@@ -74,16 +91,35 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndSubActionUpdate,
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_CANCEL_CONN_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_MQ_CANCEL_CONN_RSP, mndProcessSubscribeInternalRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);*/
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
return sdbSetTable(pMnode->pSdb, table);
}
static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) {
SMqSubscribeObj *pSub = tNewSubscribeObj(subKey);
if (pSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
tDeleteSubscribeObj(pSub);
taosMemoryFree(pSub);
return NULL;
}
return pSub;
}
#if 0
static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) {
SMqSubscribeObj *pSub = tNewSubscribeObj();
if (pSub == NULL) {
......@@ -212,7 +248,63 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq
return 0;
}
#endif
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subKey, const SMqRebOutputVg *pRebVg) {
SMqRebVgReq req = {0};
req.oldConsumerId = pRebVg->oldConsumerId;
req.newConsumerId = pRebVg->newConsumerId;
req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = req.qmsg;
strncpy(req.subKey, subKey, TSDB_SUBSCRIBE_KEY_LEN);
int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req);
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pRebVg->pVgEp->vgId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqRebVgReq(&abuf, &req);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const char *subKey,
const SMqRebOutputVg *pRebVg) {
ASSERT(pRebVg->oldConsumerId != pRebVg->newConsumerId);
void *buf;
int32_t tlen;
if (mndBuildSubChangeReq(&buf, &tlen, subKey, pRebVg) < 0) {
return -1;
}
int32_t vgId = pRebVg->pVgEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_MQ_VG_CHANGE;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1;
}
return 0;
}
#if 0
static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
......@@ -312,6 +404,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
pMsg->rspLen = tlen;
return 0;
}
#endif
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
int32_t i = 0;
......@@ -337,6 +430,7 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebSub;
}
#if 0
static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SSdb *pSdb = pMnode->pSdb;
......@@ -408,7 +502,9 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
}
return 0;
}
#endif
#if 0
static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
......@@ -422,7 +518,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if (pIter == NULL) break;
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
taosMemoryFreeClear(pRebSub->key);
mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum,
(int32_t)taosArrayGetSize(pSub->unassignedVg));
......@@ -562,6 +657,346 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mndTransDrop(pTrans);
return 0;
}
#endif
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
if (pInput->pTopic != NULL) {
// create subscribe
pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key);
} else {
pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub);
}
int32_t totalVgNum = pOutput->pSub->vgNum;
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// 2. check and get actual removed consumers, put their vg into hash
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0;
for (int32_t i = 0; i < removedNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
ASSERT(consumerId > 0);
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
ASSERT(pEpInSub);
if (pEpInSub) {
ASSERT(consumerId == pEpInSub->consumerId);
actualRemoved++;
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j);
SMqRebOutputVg outputVg = {
.oldConsumerId = consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
}
taosHashRelease(pOutput->pSub->consumerHash, pEpInSub);
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// put into removed
taosArrayPush(pOutput->removedConsumers, &consumerId);
}
}
ASSERT(removedNum == actualRemoved);
// if previously no consumer, there are vgs not assigned
{
int64_t key = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &key, sizeof(int64_t));
ASSERT(pEpInSub);
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
for (int32_t i = 0; i < consumerVgNum; i++) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
SMqRebOutputVg rebOutput = {
.oldConsumerId = -1,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
}
taosHashRelease(pOutput->pSub->consumerHash, pEpInSub);
}
// 3. calc vg number of each consumer
int32_t actualConsumerNum = taosHashGetSize(pInput->pOldSub->consumerHash) - 1 +
taosArrayGetSize(pInput->pRebInfo->newConsumers) -
taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t afterRebConsumerNum = taosHashGetSize(pOutput->pSub->consumerHash) - 1;
ASSERT(afterRebConsumerNum == actualConsumerNum);
// calc num
int32_t minVgCnt = totalVgNum / actualConsumerNum;
int32_t imbConsumerNum = totalVgNum % actualConsumerNum;
// 4. first scan: remove consumer more than wanted, put to remove hash
int32_t imbCnt = 0;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
ASSERT(pEpInSub->consumerId > 0);
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
// all old consumers still existing are touched
// TODO optimize: touch only consumer whose vgs changed
taosArrayPush(pOutput->touchedConsumers, &pEpInSub->consumerId);
if (consumerVgNum > minVgCnt) {
if (imbCnt < imbConsumerNum) {
if (consumerVgNum == minVgCnt + 1) {
continue;
} else {
// pop until equal minVg + 1
while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt + 1) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
SMqRebOutputVg outputVg = {
.oldConsumerId = pEpInSub->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
}
imbCnt++;
}
} else {
// pop until equal minVg
while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
SMqRebOutputVg outputVg = {
.oldConsumerId = pEpInSub->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
}
}
}
}
// 5. add new consumer into sub
{
int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
ASSERT(consumerId > 0);
SMqConsumerEpInSub newConsumerEp;
newConsumerEp.consumerId = consumerId;
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp,
sizeof(SMqConsumerEpInSub));
taosArrayPush(pOutput->newConsumers, &consumerId);
}
}
// 6. second scan: find consumer do not have enough vg, extract from temporary hash and assign to new consumer.
// All related vg should be put into rebVgs
SMqRebOutputVg *pRebVg = NULL;
void *pRemovedIter = NULL;
pIter = NULL;
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
ASSERT(pEpInSub->consumerId > 0);
/*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/
if (imbCnt < imbConsumerNum) {
imbCnt++;
// push until equal minVg + 1
while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt + 1) {
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
// push
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
} else {
// push until equal minVg
while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) {
// iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter);
pRebVg = (SMqRebOutputVg *)pRemovedIter;
// push
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pEpInSub->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg);
}
}
}
// 7. handle unassigned vg
if (taosHashGetSize(pOutput->pSub->consumerHash) != 1) {
// if has consumer, vg should be all assigned
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
ASSERT(pRemovedIter == NULL);
} else {
// if all consumer is removed, put all vg into unassigned
int64_t key = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &key, sizeof(int64_t));
ASSERT(pEpInSub);
ASSERT(pEpInSub->consumerId == -1);
pIter = NULL;
SMqRebOutputVg *pRebOutput = NULL;
while (1) {
pIter = taosHashIterate(pHash, pIter);
if (pIter == NULL) break;
pRebOutput = (SMqRebOutputVg *)pIter;
ASSERT(pRebOutput->newConsumerId == -1);
taosArrayPush(pEpInSub->vgs, pRebOutput->pVgEp);
}
}
// 8. generate logs
// 9. clear
taosHashCleanup(pHash);
return 0;
}
static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
if (pTrans == NULL) {
return -1;
}
// make txn:
// 1. redo action: action to all vg
const SArray *rebVgs = pOutput->rebVgs;
int32_t vgNum = taosArrayGetSize(rebVgs);
for (int32_t i = 0; i < vgNum; i++) {
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub->key, pRebVg) < 0) {
goto REB_FAIL;
}
}
// 2. redo log: subscribe and vg assignment
// subscribe
if (mndSetSubRedoLogs(pMnode, pTrans, pOutput->pSub) != 0) {
goto REB_FAIL;
}
// 3. commit log: consumer to update status and epoch
// 3.1 set touched consumer
int32_t consumerNum = taosArrayGetSize(pOutput->touchedConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->touchedConsumers, i);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
goto REB_FAIL;
}
}
// 3.2 set new consumer
consumerNum = taosArrayGetSize(pOutput->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
ASSERT(consumerId > 0);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__ADD;
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup);
taosArrayPush(pConsumerNew->rebNewTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
goto REB_FAIL;
}
}
// 3.3 set removed consumer
consumerNum = taosArrayGetSize(pOutput->removedConsumers);
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
ASSERT(consumerId > 0);
SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE;
char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup);
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
goto REB_FAIL;
}
}
// 4. commit log: modification log
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
mndTransDrop(pTrans);
return 0;
REB_FAIL:
mndTransDrop(pTrans);
return -1;
}
static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
void *pIter = NULL;
mInfo("mq rebalance start");
while (1) {
pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) break;
SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0};
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
if (pSub == NULL) {
taosRLockLatch(&pSub->lock);
// split sub key and extract topic
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pSub->key, topic, cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic);
taosRLockLatch(&pTopic->lock);
rebInput.pTopic = pTopic;
}
rebInput.pRebInfo = pRebSub;
rebInput.pOldSub = pSub;
int32_t unassignedVgNum = 0;
int64_t key = -1;
SMqConsumerEpInSub *pEpInSub = taosHashGet(pSub->consumerHash, &key, sizeof(int64_t));
if (pEpInSub != NULL) {
ASSERT(pEpInSub->consumerId == key);
unassignedVgNum = taosArrayGetSize(pEpInSub->vgs);
}
mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, unassignedVgNum);
// TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0);
if (rebInput.pTopic) {
SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic;
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
}
// reset flag
atomic_store_8(pReq->mqInReb, 0);
mInfo("mq rebalance completed successfully");
taosHashCleanup(pReq->rebSubHash);
return 0;
}
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp) {
......@@ -697,16 +1132,23 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) {
static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) {
mTrace("subscribe:%s, perform delete action", pSub->key);
tDeleteSMqSubscribeObj(pSub);
tDeleteSubscribeObj(pSub);
return 0;
}
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) {
mTrace("subscribe:%s, perform update action", pOldSub->key);
taosWLockLatch(&pOldSub->lock);
SHashObj *tmp = pOldSub->consumerHash;
pOldSub->consumerHash = pNewSub->consumerHash;
pNewSub->consumerHash = tmp;
taosWUnLockLatch(&pOldSub->lock);
return 0;
}
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
int32_t tlen = strlen(cgroup);
memcpy(key, cgroup, tlen);
key[tlen] = TMQ_SEPARATOR;
......@@ -739,6 +1181,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
sdbRelease(pSdb, pSub);
}
#if 0
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
......@@ -901,6 +1344,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
#endif
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "tarray.h"
#include "tcoding.h"
SArray* taosArrayInit(size_t size, size_t elemSize) {
assert(elemSize > 0);
......@@ -312,7 +313,14 @@ void* taosArrayDestroy(SArray* pArray) {
return NULL;
}
void taosArrayDestroyEx(SArray* pArray, void (*fp)(void*)) {
void taosArrayDestroyP(SArray* pArray, FDelete fp) {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
taosArrayDestroy(pArray);
}
void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
if (pArray == NULL) {
return;
}
......@@ -421,6 +429,37 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
return;
}
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) {
SArray* pArray = taosArrayInit(pSrc->size, pSrc->elemSize);
for (int32_t i = 0; i < pSrc->size; i++) {
void* clone = deepCopy(taosArrayGetP(pSrc, i));
taosArrayPush(pArray, &clone);
}
return pArray;
}
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) {
int32_t tlen = 0;
int32_t sz = pArray->size;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
void* data = taosArrayGetP(pArray, i);
tlen += encode(buf, data);
}
return tlen;
}
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz) {
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
*pArray = taosArrayInit(sz, sizeof(void*));
for (int32_t i = 0; i < sz; i++) {
void* data = taosMemoryCalloc(1, dataSz);
buf = decode(buf, data);
}
return (void*)buf;
}
// order array<type *>
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) {
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册