From 2dcb0145118ae9f6b8171be5184971f303910103 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 21 Apr 2022 16:37:55 +0800 Subject: [PATCH] feat(tmq): add db subscribe --- include/common/tmsg.h | 43 +- source/common/src/tmsg.c | 12 +- source/dnode/mnode/impl/inc/mndDef.h | 341 +------------ source/dnode/mnode/impl/src/mndDef.c | 97 ++-- source/dnode/mnode/impl/src/mndOffset.c | 6 +- source/dnode/mnode/impl/src/mndSubscribe.c | 564 +-------------------- source/dnode/mnode/impl/src/mndTopic.c | 86 ++-- source/dnode/vnode/src/tq/tq.c | 121 ----- 8 files changed, 154 insertions(+), 1116 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a06b102458..b2fd0e3918 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1273,11 +1273,16 @@ typedef struct { } SMVCreateStreamRsp, SMSCreateStreamRsp; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - int8_t igExists; - char* sql; - char* ast; - char subscribeDbName[TSDB_DB_NAME_LEN]; + char name[TSDB_TOPIC_FNAME_LEN]; + int8_t igExists; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; + char* sql; + char* ast; + int64_t subDbUid; + char subscribeDbName[TSDB_DB_NAME_LEN]; } SCMCreateTopicReq; int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); @@ -1932,12 +1937,22 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { return buf; } +enum { + TOPIC_SUB_TYPE__DB = 1, + TOPIC_SUB_TYPE__TABLE, +}; + typedef struct { int64_t leftForVer; int32_t vgId; int64_t oldConsumerId; int64_t newConsumerId; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int8_t subType; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; char* qmsg; } SMqRebVgReq; @@ -1948,7 +1963,14 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); tlen += taosEncodeString(buf, pReq->subKey); - tlen += taosEncodeString(buf, pReq->qmsg); + tlen += taosEncodeFixedI8(buf, pReq->subType); + tlen += taosEncodeFixedI8(buf, pReq->withTbName); + tlen += taosEncodeFixedI8(buf, pReq->withSchema); + tlen += taosEncodeFixedI8(buf, pReq->withTag); + tlen += taosEncodeFixedI8(buf, pReq->withTagSchema); + if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + tlen += taosEncodeString(buf, pReq->qmsg); + } return tlen; } @@ -1958,7 +1980,14 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); buf = taosDecodeStringTo(buf, pReq->subKey); - buf = taosDecodeString(buf, &pReq->qmsg); + buf = taosDecodeFixedI8(buf, &pReq->subType); + buf = taosDecodeFixedI8(buf, &pReq->withTbName); + buf = taosDecodeFixedI8(buf, &pReq->withSchema); + buf = taosDecodeFixedI8(buf, &pReq->withTag); + buf = taosDecodeFixedI8(buf, &pReq->withTagSchema); + if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + buf = taosDecodeString(buf, &pReq->qmsg); + } return (void*)buf; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8b48d7914a..65b68e7eb6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2674,6 +2674,10 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1; + if (tEncodeI8(&encoder, pReq->withTagSchema) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; @@ -2696,6 +2700,10 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->withTagSchema) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; @@ -3032,7 +3040,6 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq return 0; } - int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); @@ -3052,7 +3059,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq SReplica *pReplica = &pReq->replicas[i]; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; } - + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3085,7 +3092,6 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR return 0; } - int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 12710f0d4c..8cfa3944d4 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -418,82 +418,6 @@ typedef struct { char payload[]; } SSysTableRetrieveObj; -typedef struct { - int32_t vgId; // -1 for unassigned - int32_t status; - int32_t epoch; - SEpSet epSet; - int64_t oldConsumerId; - int64_t consumerId; // -1 for unassigned - char* qmsg; -} SMqConsumerEp; - -static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pConsumerEp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); - tlen += taosEncodeFixedI32(buf, pConsumerEp->status); - tlen += taosEncodeFixedI32(buf, pConsumerEp->epoch); - tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); - tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId); - tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); - tlen += taosEncodeString(buf, pConsumerEp->qmsg); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { - buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); - buf = taosDecodeFixedI32(buf, &pConsumerEp->status); - buf = taosDecodeFixedI32(buf, &pConsumerEp->epoch); - buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); - buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId); - buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = taosDecodeString(buf, &pConsumerEp->qmsg); - return buf; -} - -static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) { - if (pConsumerEp) { - taosMemoryFreeClear(pConsumerEp->qmsg); - } -} - -typedef struct { - int64_t consumerId; - SArray* vgInfo; // SArray -} SMqSubConsumer; - -static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsumer* pConsumer) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); - int32_t sz = taosArrayGetSize(pConsumer->vgInfo); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i); - tlen += tEncodeSMqConsumerEp(buf, pCEp); - } - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSubConsumer(void** buf, SMqSubConsumer* pConsumer) { - int32_t sz; - buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); - buf = taosDecodeFixedI32(buf, &sz); - pConsumer->vgInfo = taosArrayInit(sz, sizeof(SMqConsumerEp)); - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp consumerEp; - buf = tDecodeSMqConsumerEp(buf, &consumerEp); - taosArrayPush(pConsumer->vgInfo, &consumerEp); - } - return buf; -} - -static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) { - if (pSubConsumer->vgInfo) { - taosArrayDestroyEx(pSubConsumer->vgInfo, (void (*)(void*))tDeleteSMqConsumerEp); - pSubConsumer->vgInfo = NULL; - } -} - typedef struct { char key[TSDB_PARTITION_KEY_LEN]; int64_t offset; @@ -512,147 +436,21 @@ static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) return buf; } -#if 0 -typedef struct { - char key[TSDB_SUBSCRIBE_KEY_LEN]; - int32_t status; - int32_t vgNum; - SArray* consumers; // SArray - SArray* lostConsumers; // SArray - SArray* unassignedVg; // SArray -} SMqSubscribeObj; - -static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { - SMqSubscribeObj* pSub = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); - if (pSub == NULL) { - return NULL; - } - - pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer)); - if (pSub->consumers == NULL) { - goto _err; - } - - pSub->lostConsumers = taosArrayInit(0, sizeof(SMqSubConsumer)); - if (pSub->lostConsumers == NULL) { - goto _err; - } - - pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); - if (pSub->unassignedVg == NULL) { - goto _err; - } - - pSub->key[0] = 0; - pSub->vgNum = 0; - pSub->status = 0; - - return pSub; - -_err: - taosMemoryFreeClear(pSub->consumers); - taosMemoryFreeClear(pSub->lostConsumers); - taosMemoryFreeClear(pSub->unassignedVg); - taosMemoryFreeClear(pSub); - return NULL; -} - -static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { - int32_t tlen = 0; - tlen += taosEncodeString(buf, pSub->key); - tlen += taosEncodeFixedI32(buf, pSub->vgNum); - tlen += taosEncodeFixedI32(buf, pSub->status); - int32_t sz; - - sz = taosArrayGetSize(pSub->consumers); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->consumers, i); - tlen += tEncodeSMqSubConsumer(buf, pSubConsumer); - } - - sz = taosArrayGetSize(pSub->lostConsumers); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->lostConsumers, i); - tlen += tEncodeSMqSubConsumer(buf, pSubConsumer); - } - - sz = taosArrayGetSize(pSub->unassignedVg); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedVg, i); - tlen += tEncodeSMqConsumerEp(buf, pCEp); - } - - return tlen; -} - -static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) { - buf = taosDecodeStringTo(buf, pSub->key); - buf = taosDecodeFixedI32(buf, &pSub->vgNum); - buf = taosDecodeFixedI32(buf, &pSub->status); - - int32_t sz; - - buf = taosDecodeFixedI32(buf, &sz); - pSub->consumers = taosArrayInit(sz, sizeof(SMqSubConsumer)); - if (pSub->consumers == NULL) { - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqSubConsumer subConsumer = {0}; - buf = tDecodeSMqSubConsumer(buf, &subConsumer); - taosArrayPush(pSub->consumers, &subConsumer); - } - - buf = taosDecodeFixedI32(buf, &sz); - pSub->lostConsumers = taosArrayInit(sz, sizeof(SMqSubConsumer)); - if (pSub->lostConsumers == NULL) { - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqSubConsumer subConsumer = {0}; - buf = tDecodeSMqSubConsumer(buf, &subConsumer); - taosArrayPush(pSub->lostConsumers, &subConsumer); - } - - buf = taosDecodeFixedI32(buf, &sz); - pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); - if (pSub->unassignedVg == NULL) { - return NULL; - } - for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp consumerEp = {0}; - buf = tDecodeSMqConsumerEp(buf, &consumerEp); - taosArrayPush(pSub->unassignedVg, &consumerEp); - } - return buf; -} - -static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { - if (pSub->consumers) { - // taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer); - // taosArrayDestroy(pSub->consumers); - pSub->consumers = NULL; - } - - if (pSub->unassignedVg) { - // taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); - // taosArrayDestroy(pSub->unassignedVg); - pSub->unassignedVg = NULL; - } -} -#endif - typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int64_t createTime; - int64_t updateTime; - int64_t uid; + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + int64_t uid; + // TODO: use subDbUid int64_t dbUid; + int64_t subDbUid; int32_t version; + int8_t subType; // db or table + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; SRWLatch lock; int32_t sqlLen; int32_t astLen; @@ -662,79 +460,6 @@ typedef struct { SSchemaWrapper schema; } SMqTopicObj; -#if 0 -typedef struct { - int64_t consumerId; - int64_t connId; - SRWLatch lock; - char cgroup[TSDB_CGROUP_LEN]; - SArray* currentTopics; // SArray - SArray* recentRemovedTopics; // SArray - int32_t epoch; - // stat - int64_t pollCnt; - // status - int32_t status; - // heartbeat from the consumer reset hbStatus to 0 - // each checkConsumerAlive msg add hbStatus by 1 - // if checkConsumerAlive > CONSUMER_REBALANCE_CNT, mask to lost - int32_t hbStatus; -} SMqConsumerObj; - -static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { - int32_t sz; - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); - tlen += taosEncodeFixedI64(buf, pConsumer->connId); - tlen += taosEncodeFixedI32(buf, pConsumer->epoch); - tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt); - tlen += taosEncodeFixedI32(buf, pConsumer->status); - tlen += taosEncodeString(buf, pConsumer->cgroup); - - sz = taosArrayGetSize(pConsumer->currentTopics); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - char* topic = taosArrayGetP(pConsumer->currentTopics, i); - tlen += taosEncodeString(buf, topic); - } - - sz = taosArrayGetSize(pConsumer->recentRemovedTopics); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - char* topic = taosArrayGetP(pConsumer->recentRemovedTopics, i); - tlen += taosEncodeString(buf, topic); - } - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) { - int32_t sz; - buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); - buf = taosDecodeFixedI64(buf, &pConsumer->connId); - buf = taosDecodeFixedI32(buf, &pConsumer->epoch); - buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt); - buf = taosDecodeFixedI32(buf, &pConsumer->status); - buf = taosDecodeStringTo(buf, pConsumer->cgroup); - - buf = taosDecodeFixedI32(buf, &sz); - pConsumer->currentTopics = taosArrayInit(sz, sizeof(void*)); - for (int32_t i = 0; i < sz; i++) { - char* topic; - buf = taosDecodeString(buf, &topic); - taosArrayPush(pConsumer->currentTopics, &topic); - } - - buf = taosDecodeFixedI32(buf, &sz); - pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(void*)); - for (int32_t i = 0; i < sz; i++) { - char* topic; - buf = taosDecodeString(buf, &topic); - taosArrayPush(pConsumer->recentRemovedTopics, &topic); - } - return buf; -} -#endif - enum { CONSUMER_UPDATE__TOUCH = 1, CONSUMER_UPDATE__ADD, @@ -753,12 +478,9 @@ typedef struct { int32_t hbStatus; // lock is used for topics update SRWLatch lock; - SArray* currentTopics; // SArray -#if 0 - SArray* waitingRebTopics; // SArray -#endif - SArray* rebNewTopics; // SArray - SArray* rebRemovedTopics; // SArray + SArray* currentTopics; // SArray + SArray* rebNewTopics; // SArray + SArray* rebRemovedTopics; // SArray } SMqConsumerObj; SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); @@ -768,9 +490,13 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer typedef struct { int32_t vgId; + int8_t subType; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; char* qmsg; - // char topic[TSDB_TOPIC_FNAME_LEN]; - SEpSet epSet; + SEpSet epSet; } SMqVgEp; SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); @@ -792,7 +518,14 @@ typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; SRWLatch lock; int32_t vgNum; + int8_t subType; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub + // TODO put -1 into unassignVgs + // SArray* unassignedVgs; } SMqSubscribeObj; SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); @@ -821,18 +554,6 @@ void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog); int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog); void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); -typedef struct { - int64_t consumerId; - char cgroup[TSDB_CGROUP_LEN]; - SRWLatch lock; - SArray* vgs; // SArray -} SMqConsumerEpObj; - -SMqConsumerEpObj* tCloneSMqConsumerEpObj(const SMqConsumerEpObj* pConsumerEp); -void tDeleteSMqConsumerEpObj(SMqConsumerEpObj* pConsumerEp); -int32_t tEncodeSMqConsumerEpObj(void** buf, const SMqConsumerEpObj* pConsumerEp); -void* tDecodeSMqConsumerEpObj(const void* buf, SMqConsumerEpObj* pConsumerEp); - typedef struct { const SMqSubscribeObj* pOldSub; const SMqTopicObj* pTopic; @@ -845,12 +566,6 @@ typedef struct { SMqVgEp* pVgEp; } SMqRebOutputVg; -#if 0 -typedef struct { - int64_t consumerId; -} SMqRebOutputConsumer; -#endif - typedef struct { SArray* rebVgs; // SArray SArray* newConsumers; // SArray diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 37c819ae60..06440d9305 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -32,9 +32,6 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L taosInitRWLatch(&pConsumer->lock); pConsumer->currentTopics = taosArrayInit(0, sizeof(void *)); -#if 0 - pConsumer->waitingRebTopics = NULL; -#endif pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *)); pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *)); @@ -53,11 +50,6 @@ void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { if (pConsumer->currentTopics) { taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); } -#if 0 - if (pConsumer->waitingRebTopics) { - taosArrayDestroyP(pConsumer->waitingRebTopics, taosMemoryFree); - } -#endif if (pConsumer->rebNewTopics) { taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); } @@ -87,20 +79,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI32(buf, 0); } -#if 0 - // waiting reb topics - if (pConsumer->waitingRebTopics) { - sz = taosArrayGetSize(pConsumer->waitingRebTopics); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pConsumer->waitingRebTopics, i); - tlen += taosEncodeString(buf, topic); - } - } else { - tlen += taosEncodeFixedI32(buf, 0); - } -#endif - // reb new topics if (pConsumer->rebNewTopics) { sz = taosArrayGetSize(pConsumer->rebNewTopics); @@ -145,17 +123,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) { taosArrayPush(pConsumer->currentTopics, &topic); } -#if 0 - // waiting reb topics - buf = taosDecodeFixedI32(buf, &sz); - pConsumer->waitingRebTopics = taosArrayInit(sz, sizeof(void *)); - for (int32_t i = 0; i < sz; i++) { - char *topic; - buf = taosDecodeString(buf, &topic); - taosArrayPush(pConsumer->waitingRebTopics, &topic); - } -#endif - // reb new topics buf = taosDecodeFixedI32(buf, &sz); pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *)); @@ -181,6 +148,11 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); if (pVgEpNew == NULL) return NULL; pVgEpNew->vgId = pVgEp->vgId; + pVgEpNew->subType = pVgEp->subType; + pVgEpNew->withTbName = pVgEp->withTbName; + pVgEpNew->withSchema = pVgEp->withSchema; + pVgEpNew->withTag = pVgEp->withTag; + pVgEpNew->withTagSchema = pVgEp->withTagSchema; pVgEpNew->qmsg = strdup(pVgEp->qmsg); /*memcpy(pVgEpNew->topic, pVgEp->topic, TSDB_TOPIC_FNAME_LEN);*/ pVgEpNew->epSet = pVgEp->epSet; @@ -192,6 +164,11 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { taosMemoryFree(pVgEp->qmsg); } int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); + tlen += taosEncodeFixedI8(buf, pVgEp->subType); + tlen += taosEncodeFixedI8(buf, pVgEp->withTbName); + tlen += taosEncodeFixedI8(buf, pVgEp->withSchema); + tlen += taosEncodeFixedI8(buf, pVgEp->withTag); + tlen += taosEncodeFixedI8(buf, pVgEp->withTagSchema); tlen += taosEncodeString(buf, pVgEp->qmsg); /*tlen += taosEncodeString(buf, pVgEp->topic);*/ tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); @@ -200,41 +177,17 @@ int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); + buf = taosDecodeFixedI8(buf, &pVgEp->subType); + buf = taosDecodeFixedI8(buf, &pVgEp->withTbName); + buf = taosDecodeFixedI8(buf, &pVgEp->withSchema); + buf = taosDecodeFixedI8(buf, &pVgEp->withTag); + buf = taosDecodeFixedI8(buf, &pVgEp->withTagSchema); buf = taosDecodeString(buf, &pVgEp->qmsg); /*buf = taosDecodeStringTo(buf, pVgEp->topic);*/ buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return (void *)buf; } -SMqConsumerEpObj *tCloneSMqConsumerEpObj(const SMqConsumerEpObj *pConsumerEp) { - SMqConsumerEpObj *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEpObj)); - if (pConsumerEpNew == NULL) return NULL; - pConsumerEpNew->consumerId = pConsumerEp->consumerId; - memcpy(pConsumerEpNew->cgroup, pConsumerEp->cgroup, TSDB_CGROUP_LEN); - taosInitRWLatch(&pConsumerEpNew->lock); - pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpNew->vgs, (FCopy)tCloneSMqVgEp); - return pConsumerEpNew; -} - -void tDeleteSMqConsumerEpObj(SMqConsumerEpObj *pConsumerEp) { - taosArrayDestroyEx(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); -} - -int32_t tEncodeSMqConsumerEpObj(void **buf, const SMqConsumerEpObj *pConsumerEp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); - tlen += taosEncodeString(buf, pConsumerEp->cgroup); - tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); - return tlen; -} - -void *tDecodeSMqConsumerEpObj(const void *buf, SMqConsumerEpObj *pConsumerEp) { - buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = taosDecodeStringTo(buf, pConsumerEp->cgroup); - buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp)); - return (void *)buf; -} - SMqConsumerEpInSub *tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub *pEpInSub) { SMqConsumerEpInSub *pEpInSubNew = taosMemoryMalloc(sizeof(SMqConsumerEpInSub)); if (pEpInSubNew == NULL) return NULL; @@ -276,7 +229,7 @@ void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) { } SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) { - SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj)); + SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); if (pSubNew == NULL) return NULL; memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN); taosInitRWLatch(&pSubNew->lock); @@ -297,8 +250,14 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { if (pSubNew == NULL) return NULL; memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); taosInitRWLatch(&pSubNew->lock); + + pSubNew->subType = pSub->subType; + pSubNew->withTbName = pSub->withTbName; + pSubNew->withSchema = pSub->withSchema; + pSubNew->withTag = pSub->withTag; + pSubNew->withTagSchema = pSub->withTagSchema; + pSubNew->vgNum = pSub->vgNum; - /*pSubNew->consumerEps = taosArrayDeepCopy(pSub->consumerEps, (FCopy)tCloneSMqConsumerEpInSub);*/ pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); /*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/ void *pIter = NULL; @@ -325,6 +284,11 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { int32_t tlen = 0; tlen += taosEncodeString(buf, pSub->key); tlen += taosEncodeFixedI32(buf, pSub->vgNum); + tlen += taosEncodeFixedI8(buf, pSub->subType); + tlen += taosEncodeFixedI8(buf, pSub->withTbName); + tlen += taosEncodeFixedI8(buf, pSub->withSchema); + tlen += taosEncodeFixedI8(buf, pSub->withTag); + tlen += taosEncodeFixedI8(buf, pSub->withTagSchema); void *pIter = NULL; int32_t sz = taosHashGetSize(pSub->consumerHash); @@ -347,6 +311,11 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { // buf = taosDecodeStringTo(buf, pSub->key); buf = taosDecodeFixedI32(buf, &pSub->vgNum); + buf = taosDecodeFixedI8(buf, &pSub->subType); + buf = taosDecodeFixedI8(buf, &pSub->withTbName); + buf = taosDecodeFixedI8(buf, &pSub->withSchema); + buf = taosDecodeFixedI8(buf, &pSub->withTag); + buf = taosDecodeFixedI8(buf, &pSub->withTagSchema); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index dad912b4e6..f5433e8f9e 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -133,9 +133,9 @@ OFFSET_DECODE_OVER: int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) { int32_t sz = taosArrayGetSize(vgs); for (int32_t i = 0; i < sz; i++) { - SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i); - SMqOffsetObj offsetObj; - if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, pConsumerEp->vgId) < 0) { + int32_t vgId = *(int32_t *)taosArrayGet(vgs, i); + SMqOffsetObj offsetObj; + if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) { return -1; } offsetObj.offset = -1; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index f708d4ffc1..7c0f979811 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -46,16 +46,8 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); -/*static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg);*/ -/*static int32_t mndProcessSubscribeRsp(SNodeMsg *pMsg);*/ -static int32_t mndProcessSubscribeInternalReq(SNodeMsg *pMsg); -static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg); -/*static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg);*/ -/*static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg);*/ -/*static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg);*/ -/*static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);*/ - static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg); static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); @@ -73,15 +65,6 @@ static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeO return 0; } -/*static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,*/ -/*const SMqConsumerEp *pConsumerEp);*/ - -/*static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,*/ -/*const char *topicName);*/ - -/*static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp,*/ -/*const char *oldTopicName);*/ - int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, .keyType = SDB_KEY_BINARY, @@ -91,13 +74,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndSubActionUpdate, .deleteFp = (SdbDeleteFp)mndSubActionDelete}; - /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);*/ - /*mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_VND_MQ_REB_RSP, mndProcessSubscribeInternalRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_VND_MQ_CANCEL_CONN_RSP, mndProcessSubscribeInternalRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);*/ - /*mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);*/ - /*mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);*/ mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); return sdbSetTable(pMnode->pSdb, table); @@ -122,137 +98,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, return pSub; } -#if 0 -static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *cgroup) { - SMqSubscribeObj *pSub = tNewSubscribeObj(); - if (pSub == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - char key[TSDB_SUBSCRIBE_KEY_LEN]; - mndMakeSubscribeKey(key, cgroup, pTopic->name); - strcpy(pSub->key, key); - - if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { - tDeleteSMqSubscribeObj(pSub); - taosMemoryFree(pSub); - return NULL; - } - - // TODO: disable alter subscribed table - return pSub; -} - -static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, - const char *topicName) { - SMqMVRebReq req = { - .vgId = pConsumerEp->vgId, - .oldConsumerId = pConsumerEp->oldConsumerId, - .newConsumerId = pConsumerEp->consumerId, - }; - req.topic = strdup(topicName); - - int32_t tlen = tEncodeSMqMVRebReq(NULL, &req); - void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - SMsgHead *pMsgHead = (SMsgHead *)buf; - - pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); - pMsgHead->vgId = htonl(pConsumerEp->vgId); - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqMVRebReq(&abuf, &req); - taosMemoryFree(req.topic); - - *pBuf = buf; - *pLen = tlen; - - return 0; -} - -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, - const char *topicName) { - ASSERT(pConsumerEp->oldConsumerId != -1); - - void *buf; - int32_t tlen; - if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp, topicName) < 0) { - return -1; - } - - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + tlen; - action.msgType = TDMT_VND_MQ_REB; - - mndReleaseVgroup(pMnode, pVgObj); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - return -1; - } - - return 0; -} - -static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, - const char *oldTopicName) { - SMqCancelConnReq req = {0}; - req.consumerId = pConsumerEp->consumerId; - req.vgId = pConsumerEp->vgId; - req.epoch = pConsumerEp->epoch; - strcpy(req.topicName, oldTopicName); - - int32_t tlen = tEncodeSMqCancelConnReq(NULL, &req); - void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - SMsgHead *pMsgHead = (SMsgHead *)buf; - - pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); - pMsgHead->vgId = htonl(pConsumerEp->vgId); - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqCancelConnReq(&abuf, &req); - *pBuf = buf; - *pLen = tlen; - return 0; -} - -static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, - const char *oldTopicName) { - void *buf; - int32_t tlen; - if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp, oldTopicName) < 0) { - return -1; - } - - int32_t vgId = pConsumerEp->vgId; - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + tlen; - action.msgType = TDMT_VND_MQ_CANCEL_CONN; - - mndReleaseVgroup(pMnode, pVgObj); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - return -1; - } - - return 0; -} -#endif - static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subKey, const SMqRebOutputVg *pRebVg) { SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; @@ -307,108 +152,6 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const ch return 0; } -#if 0 -static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { - SMnode *pMnode = pMsg->pNode; - SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; - SMqCMGetSubEpRsp rsp = {0}; - int64_t consumerId = be64toh(pReq->consumerId); - int32_t epoch = ntohl(pReq->epoch); - - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pNode, consumerId); - if (pConsumer == NULL) { - terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - return -1; - } - // TODO add lock - ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); - int32_t serverEpoch = pConsumer->epoch; - - // TODO - int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); - mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, - hbStatus); - atomic_store_32(&pConsumer->hbStatus, 0); - /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ - /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ - /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/ - - strcpy(rsp.cgroup, pReq->cgroup); - if (epoch != serverEpoch) { - mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, - serverEpoch); - mDebug("consumer %ld try r lock", consumerId); - taosRLockLatch(&pConsumer->lock); - mDebug("consumer %ld r locked", consumerId); - SArray *pTopics = pConsumer->currentTopics; - int32_t sz = taosArrayGetSize(pTopics); - rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); - for (int32_t i = 0; i < sz; i++) { - char *topicName = taosArrayGetP(pTopics, i); - SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName); - ASSERT(pSub); - int32_t csz = taosArrayGetSize(pSub->consumers); - // TODO: change to bsearch - for (int32_t j = 0; j < csz; j++) { - SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); - if (consumerId == pSubConsumer->consumerId) { - int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); - mInfo("topic %s has %d vg", topicName, serverEpoch); - - SMqSubTopicEp topicEp; - strcpy(topicEp.topic, topicName); - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topicName); - ASSERT(pTopic != NULL); - topicEp.schema = pTopic->schema; - mndReleaseTopic(pMnode, pTopic); - - topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); - for (int32_t k = 0; k < vgsz; k++) { - char offsetKey[TSDB_PARTITION_KEY_LEN]; - SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); - SMqSubVgEp vgEp = { - .epSet = pConsumerEp->epSet, - .vgId = pConsumerEp->vgId, - .offset = -1, - }; - mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId); - SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey); - if (pOffsetObj != NULL) { - vgEp.offset = pOffsetObj->offset; - mndReleaseOffset(pMnode, pOffsetObj); - } - taosArrayPush(topicEp.vgs, &vgEp); - } - taosArrayPush(rsp.topics, &topicEp); - break; - } - } - mndReleaseSubscribe(pMnode, pSub); - } - taosRUnLockLatch(&pConsumer->lock); - mDebug("consumer %ld r unlock", consumerId); - } - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp); - void *buf = rpcMallocCont(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; - ((SMqRspHead *)buf)->epoch = serverEpoch; - ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; - - void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); - tDeleteSMqCMGetSubEpRsp(&rsp); - mndReleaseConsumer(pMnode, pConsumer); - pMsg->pRsp = buf; - pMsg->rspLen = tlen; - return 0; -} -#endif - static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) { int32_t i = 0; while (key[i] != TMQ_SEPARATOR) { @@ -433,235 +176,6 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebSub; } -#if 0 -static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { - SMnode *pMnode = pMsg->pNode; - SSdb *pSdb = pMnode->pSdb; - SMqConsumerObj *pConsumer; - void *pIter = NULL; - SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); - pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); - - while (1) { - pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); - if (pIter == NULL) break; - int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); - if (hbStatus > MND_SUBSCRIBE_REBALANCE_CNT) { - int32_t old = - atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); - if (old == MQ_CONSUMER_STATUS__ACTIVE) { - // get all topics of that topic - int32_t sz = taosArrayGetSize(pConsumer->currentTopics); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pConsumer->currentTopics, i); - char key[TSDB_SUBSCRIBE_KEY_LEN]; - mndMakeSubscribeKey(key, pConsumer->cgroup, topic); - SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); - taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId); - } - } - } - int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) { - SArray *rebSubs; - if (status == MQ_CONSUMER_STATUS__INIT) { - rebSubs = pConsumer->currentTopics; - } else { - rebSubs = pConsumer->recentRemovedTopics; - } - int32_t sz = taosArrayGetSize(rebSubs); - for (int32_t i = 0; i < sz; i++) { - char *topic = taosArrayGetP(rebSubs, i); - char key[TSDB_SUBSCRIBE_KEY_LEN]; - mndMakeSubscribeKey(key, pConsumer->cgroup, topic); - SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); - if (status == MQ_CONSUMER_STATUS__INIT) { - taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); - } else if (status == MQ_CONSUMER_STATUS__MODIFY) { - taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); - } - } - if (status == MQ_CONSUMER_STATUS__MODIFY) { - int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics); - for (int32_t i = 0; i < removeSz; i++) { - char *topicName = taosArrayGetP(pConsumer->recentRemovedTopics, i); - taosMemoryFree(topicName); - } - taosArrayClear(pConsumer->recentRemovedTopics); - } - } - } - if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { - mInfo("mq rebalance will be triggered"); - SRpcMsg rpcMsg = { - .msgType = TDMT_MND_MQ_DO_REBALANCE, - .pCont = pRebMsg, - .contLen = sizeof(SMqDoRebalanceMsg), - }; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - } else { - taosHashCleanup(pRebMsg->rebSubHash); - rpcFreeCont(pRebMsg); - } - return 0; -} -#endif - -#if 0 -static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { - SMnode *pMnode = pMsg->pNode; - SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg); - void *pIter = NULL; - - mInfo("mq rebalance start"); - - while (1) { - pIter = taosHashIterate(pReq->rebSubHash, pIter); - if (pIter == NULL) break; - SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; - SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); - - mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, - (int32_t)taosArrayGetSize(pSub->unassignedVg)); - - // remove lost consumer - for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { - int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i); - - mInfo("mq remove lost consumer %" PRId64 "", lostConsumerId); - - for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) { - SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); - if (pSubConsumer->consumerId == lostConsumerId) { - taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo); - taosArrayPush(pSub->lostConsumers, pSubConsumer); - taosArrayRemove(pSub->consumers, j); - break; - } - } - } - - // calculate rebalance - int32_t consumerNum = taosArrayGetSize(pSub->consumers); - if (consumerNum != 0) { - int32_t vgNum = pSub->vgNum; - int32_t vgEachConsumer = vgNum / consumerNum; - int32_t imbalanceVg = vgNum % consumerNum; - - // iterate all consumers, set unassignedVgStash - for (int32_t i = 0; i < consumerNum; i++) { - SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); - int32_t vgThisConsumerAfterRb; - if (i < imbalanceVg) - vgThisConsumerAfterRb = vgEachConsumer + 1; - else - vgThisConsumerAfterRb = vgEachConsumer; - - mInfo("mq consumer:%" PRId64 ", connectted vgroup number change from %d to %d", pSubConsumer->consumerId, - vgThisConsumerBeforeRb, vgThisConsumerAfterRb); - - while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) { - SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo); - ASSERT(pConsumerEp != NULL); - ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId); - taosArrayPush(pSub->unassignedVg, pConsumerEp); - mDebug("mq rebalance: vg %d push to unassignedVg", pConsumerEp->vgId); - } - - SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId); - mDebug("consumer %ld try w lock", pRebConsumer->consumerId); - taosWLockLatch(&pRebConsumer->lock); - mDebug("consumer %ld w locked", pRebConsumer->consumerId); - int32_t status = atomic_load_32(&pRebConsumer->status); - if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || - (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || - (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { - /*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/ - /*pRebConsumer->epoch++;*/ - /*}*/ - if (vgThisConsumerAfterRb != 0) { - atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); - } else { - atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); - } - - mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, - pRebConsumer->status); - - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); - sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - mndTransAppendCommitlog(pTrans, pConsumerRaw); - } - taosWUnLockLatch(&pRebConsumer->lock); - mDebug("consumer %ld w unlock", pRebConsumer->consumerId); - mndReleaseConsumer(pMnode, pRebConsumer); - } - - // assign to vgroup - if (taosArrayGetSize(pSub->unassignedVg) != 0) { - for (int32_t i = 0; i < consumerNum; i++) { - SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int32_t vgThisConsumerAfterRb; - if (i < imbalanceVg) - vgThisConsumerAfterRb = vgEachConsumer + 1; - else - vgThisConsumerAfterRb = vgEachConsumer; - - while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) { - SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg); - mDebug("mq rebalance: vg %d pop from unassignedVg", pConsumerEp->vgId); - ASSERT(pConsumerEp != NULL); - - pConsumerEp->oldConsumerId = pConsumerEp->consumerId; - pConsumerEp->consumerId = pSubConsumer->consumerId; - // TODO - pConsumerEp->epoch = 0; - taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); - - char topic[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pSub->key, topic, cgroup); - if (pConsumerEp->oldConsumerId == -1) { - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - - mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId, - topic, pConsumerEp->consumerId, cgroup); - - mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); - mndReleaseTopic(pMnode, pTopic); - } else { - mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", - pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); - - mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, topic); - } - } - } - } - ASSERT(taosArrayGetSize(pSub->unassignedVg) == 0); - - // TODO: log rebalance statistics - SSdbRaw *pSubRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pSubRaw, SDB_STATUS_READY); - mndTransAppendRedolog(pTrans, pSubRaw); - } - mndReleaseSubscribe(pMnode, pSub); - } - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - taosHashCleanup(pReq->rebSubHash); - mndTransDrop(pTrans); - return -1; - } - - taosHashCleanup(pReq->rebSubHash); - mndTransDrop(pTrans); - return 0; -} -#endif - static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { if (pInput->pTopic != NULL) { // create subscribe @@ -825,36 +339,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pRebVg->newConsumerId = pEpInSub->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); } - -#if 0 - /*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/ - if (imbCnt < imbConsumerNum) { - imbCnt++; - // push until equal minVg + 1 - while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt + 1) { - // iter hash and find one vg - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - ASSERT(pRemovedIter); - pRebVg = (SMqRebOutputVg *)pRemovedIter; - // push - taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp); - pRebVg->newConsumerId = pEpInSub->consumerId; - taosArrayPush(pOutput->rebVgs, pRebVg); - } - } else { - // push until equal minVg - while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) { - // iter hash and find one vg - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - ASSERT(pRemovedIter); - pRebVg = (SMqRebOutputVg *)pRemovedIter; - // push - taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp); - pRebVg->newConsumerId = pEpInSub->consumerId; - taosArrayPush(pOutput->rebVgs, pRebVg); - } - } -#endif } // 7. handle unassigned vg @@ -1040,52 +524,6 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { return 0; } -static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, - const SMqConsumerEp *pConsumerEp) { - ASSERT(pConsumerEp->oldConsumerId == -1); - int32_t vgId = pConsumerEp->vgId; - - SMqSetCVgReq req = { - .vgId = vgId, - .consumerId = pConsumerEp->consumerId, - .sql = pTopic->sql, - .physicalPlan = pTopic->physicalPlan, - .qmsg = pConsumerEp->qmsg, - }; - - strcpy(req.cgroup, cgroup); - strcpy(req.topicName, pTopic->name); - int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); - void *buf = taosMemoryMalloc(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - SMsgHead *pMsgHead = (SMsgHead *)buf; - - pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); - pMsgHead->vgId = htonl(vgId); - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncodeSMqSetCVgReq(&abuf, &req); - - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = buf; - action.contLen = sizeof(SMsgHead) + tlen; - action.msgType = TDMT_VND_MQ_SET_CONN; - - mndReleaseVgroup(pMnode, pVgObj); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - return -1; - } - return 0; -} - void mndCleanupSubscribe(SMnode *pMnode) {} static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b9c42fe899..22b1b404bb 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -76,7 +76,13 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->subDbUid, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); + SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER); + SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER); + SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER); + SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER); + SDB_SET_INT8(pRaw, dataPos, pTopic->withTagSchema, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER); @@ -134,7 +140,13 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER); + SDB_GET_INT64(pRaw, dataPos, &pTopic->subDbUid, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER); + SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER); + SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER); + SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER); + SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER); + SDB_GET_INT8(pRaw, dataPos, &pTopic->withTagSchema, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char)); @@ -254,33 +266,13 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; return -1; } - return 0; -} - -#if 0 -static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) { - if (NULL == pCreate->ast) { - return TSDB_CODE_SUCCESS; - } - - SNode *pAst = NULL; - int32_t code = nodesStringToNode(pCreate->ast, &pAst); - SQueryPlan *pPlan = NULL; - if (TSDB_CODE_SUCCESS == code) { - SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; - code = qCreateQueryPlan(&cxt, &pPlan, NULL); - } - - if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString(pPlan, false, pStr, NULL); + if ((pCreate->ast == NULL || pCreate->ast[0] == 0) && pCreate->subscribeDbName[0] == 0) { + terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; + return -1; } - nodesDestroyNode(pAst); - nodesDestroyNode(pPlan); - terrno = code; - return code; + return 0; } -#endif static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); @@ -297,28 +289,38 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; - SNode *pAst = NULL; - if (nodesStringToNode(pCreate->ast, &pAst) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } + if (pCreate->ast && pCreate->ast[0]) { + topicObj.subType = TOPIC_SUB_TYPE__TABLE; + topicObj.withTbName = 0; + topicObj.withSchema = 0; - SQueryPlan *pPlan = NULL; + SNode *pAst = NULL; + if (nodesStringToNode(pCreate->ast, &pAst) != 0) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } - SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; - if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } + SQueryPlan *pPlan = NULL; - if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } + SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; + if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } - if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + + if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + } else { + topicObj.subType = TOPIC_SUB_TYPE__DB; + topicObj.withTbName = 1; + topicObj.withSchema = 1; } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c7c8054120..510dd32459 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -618,127 +618,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { } } -#if 0 -int32_t tqProcessRebReq(STQ* pTq, char* msg) { - SMqMVRebReq req = {0}; - terrno = TSDB_CODE_SUCCESS; - tDecodeSMqMVRebReq(msg, &req); - - vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId); - STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); - ASSERT(pConsumer); - ASSERT(pConsumer->consumerId == req.oldConsumerId); - int32_t numOfTopics = taosArrayGetSize(pConsumer->topics); - if (numOfTopics == 1) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); - ASSERT(strcmp(pTopic->topicName, req.topic) == 0); - STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId); - if (pNewConsumer == NULL) { - pConsumer->consumerId = req.newConsumerId; - tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); - tqHandleCommit(pTq->tqMeta, req.newConsumerId); - tqHandlePurge(pTq->tqMeta, req.oldConsumerId); - return 0; - } else { - taosArrayPush(pNewConsumer->topics, pTopic); - } - } else { - for (int32_t i = 0; i < numOfTopics; i++) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); - if (strcmp(pTopic->topicName, req.topic) == 0) { - STqConsumer* pNewConsumer = tqHandleGet(pTq->tqMeta, req.newConsumerId); - if (pNewConsumer == NULL) { - pNewConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); - if (pNewConsumer == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - strcpy(pNewConsumer->cgroup, pConsumer->cgroup); - pNewConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); - pNewConsumer->consumerId = req.newConsumerId; - pNewConsumer->epoch = 0; - - taosArrayPush(pNewConsumer->topics, pTopic); - tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); - tqHandleCommit(pTq->tqMeta, req.newConsumerId); - return 0; - } - ASSERT(pNewConsumer->consumerId == req.newConsumerId); - taosArrayPush(pNewConsumer->topics, pTopic); - break; - } - } - // - } - return 0; -} - -int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { - SMqSetCVgReq req = {0}; - tDecodeSMqSetCVgReq(msg, &req); - bool create = false; - - vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId); - STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId); - if (pConsumer == NULL) { - pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); - if (pConsumer == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - strcpy(pConsumer->cgroup, req.cgroup); - pConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); - pConsumer->consumerId = req.consumerId; - pConsumer->epoch = 0; - create = true; - } - - STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic)); - if (pTopic == NULL) { - taosArrayDestroy(pConsumer->topics); - taosMemoryFree(pConsumer); - return -1; - } - strcpy(pTopic->topicName, req.topicName); - pTopic->sql = req.sql; - pTopic->physicalPlan = req.physicalPlan; - pTopic->qmsg = req.qmsg; - /*pTopic->committedOffset = -1;*/ - /*pTopic->currentOffset = -1;*/ - - pTopic->buffer.firstOffset = -1; - pTopic->buffer.lastOffset = -1; - pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); - if (pTopic->pReadhandle == NULL) { - ASSERT(false); - } - for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - pTopic->buffer.output[i].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); - SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnodeMeta, - }; - pTopic->buffer.output[i].pReadHandle = pReadHandle; - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); - ASSERT(pTopic->buffer.output[i].task); - } - vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, TD_VID(pTq->pVnode)); - taosArrayPush(pConsumer->topics, pTopic); - if (create) { - tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); - tqHandleCommit(pTq->tqMeta, req.consumerId); - } - terrno = TSDB_CODE_SUCCESS; - return 0; -} - -int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) { - terrno = TSDB_CODE_SUCCESS; - return 0; -} -#endif - int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { if (pTask->execType == TASK_EXEC__NONE) return 0; -- GitLab