From 00ccbb4a3a87bd9d91c0cbbdd724ca36713fb2a8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 19 Jan 2022 16:28:13 +0800 Subject: [PATCH] add subscribe for sdb --- include/common/tmsg.h | 24 + include/dnode/mnode/sdb/sdb.h | 4 +- include/util/tdef.h | 1 + source/dnode/mnode/impl/inc/mndConsumer.h | 3 + source/dnode/mnode/impl/inc/mndDef.h | 238 +++++++- source/dnode/mnode/impl/inc/mndSubscribe.h | 38 ++ source/dnode/mnode/impl/src/mndConsumer.c | 267 ++------- source/dnode/mnode/impl/src/mndProfile.c | 3 + source/dnode/mnode/impl/src/mndSubscribe.c | 597 +++++++++++++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 14 +- source/dnode/mnode/sdb/src/sdbHash.c | 4 +- 11 files changed, 934 insertions(+), 259 deletions(-) create mode 100644 source/dnode/mnode/impl/inc/mndSubscribe.h create mode 100644 source/dnode/mnode/impl/src/mndSubscribe.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dfd376f1e9..7d9fd53e69 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -324,6 +324,30 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { return buf; } +typedef struct SMqSetCVgReq { + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqSetCVgReq; + +static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pReq->vgId); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); + tlen += taosEncodeString(buf, pReq->topicName); + tlen += taosEncodeString(buf, pReq->cGroup); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { + buf = taosDecodeFixedI32(buf, &pReq->vgId); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); + buf = taosDecodeStringTo(buf, pReq->topicName); + buf = taosDecodeStringTo(buf, pReq->cGroup); + return buf; +} + typedef struct { int32_t vgId; char* dbName; diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 5a4ac6a96f..7b022dd7c7 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -113,8 +113,8 @@ typedef enum { SDB_USER = 7, SDB_AUTH = 8, SDB_ACCT = 9, - SDB_CONSUMER = 10, - SDB_CGROUP = 11, + SDB_SUBSCRIBE = 10, + SDB_CONSUMER = 11, SDB_TOPIC = 12, SDB_VGROUP = 13, SDB_STB = 14, diff --git a/include/util/tdef.h b/include/util/tdef.h index 428de5d171..16ae60c231 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -178,6 +178,7 @@ do { \ #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_CONSUMER_GROUP_LEN 192 +#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 68ba08b66e..9d1dd084ee 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -28,6 +28,9 @@ void mndCleanupConsumer(SMnode *pMnode); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); +SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); +SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a2d6bbf4e6..a069987368 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -326,17 +326,156 @@ typedef struct SMqTopicConsumer { #endif typedef struct SMqConsumerEp { - int32_t vgId; + int32_t vgId; // -1 for unassigned SEpSet epset; - int64_t consumerId; + int64_t consumerId; // -1 for unassigned + int64_t lastConsumerHbTs; + int64_t lastVgHbTs; } SMqConsumerEp; -typedef struct SMqCgroupTopicPair { - char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN]; - SArray* assigned; // SArray - SArray* unassignedConsumer; - SArray* unassignedVg; -} SMqCgroupTopicPair; +static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); + tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); + tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { + buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); + buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); + buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); + return buf; +} + +//unit for rebalance +typedef struct SMqSubscribeObj { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + int32_t epoch; + //TODO: replace with priority queue + SArray* availConsumer; // SArray (consumerId) + SArray* assigned; // SArray + SArray* unassignedConsumer; // SArray + SArray* unassignedVg; // SArray +} SMqSubscribeObj; + +static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { + SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj)); + pSub->key[0] = 0; + pSub->epoch = 0; + if (pSub == NULL) { + return NULL; + } + pSub->availConsumer = taosArrayInit(0, sizeof(int64_t)); + if (pSub->availConsumer == NULL) { + free(pSub); + return NULL; + } + pSub->assigned = taosArrayInit(0, sizeof(SMqConsumerEp)); + if (pSub->assigned == NULL) { + taosArrayDestroy(pSub->availConsumer); + free(pSub); + return NULL; + } + pSub->unassignedConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); + if (pSub->assigned == NULL) { + taosArrayDestroy(pSub->availConsumer); + taosArrayDestroy(pSub->unassignedConsumer); + free(pSub); + return NULL; + } + pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); + if (pSub->assigned == NULL) { + taosArrayDestroy(pSub->availConsumer); + taosArrayDestroy(pSub->unassignedConsumer); + taosArrayDestroy(pSub->unassignedVg); + free(pSub); + return NULL; + } + return NULL; +} + +static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pSub->key); + tlen += taosEncodeFixedI32(buf, pSub->epoch); + int32_t sz; + + sz = taosArrayGetSize(pSub->availConsumer); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + int64_t* pConsumerId = taosArrayGet(pSub->availConsumer, i); + tlen += taosEncodeFixedI64(buf, *pConsumerId); + } + + sz = taosArrayGetSize(pSub->assigned); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i); + tlen += tEncodeSMqConsumerEp(buf, pCEp); + } + + sz = taosArrayGetSize(pSub->unassignedConsumer); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedConsumer, i); + tlen += tEncodeSMqConsumerEp(buf, pCEp); + } + + sz = taosArrayGetSize(pSub->unassignedVg); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedVg, i); + tlen += tEncodeSMqConsumerEp(buf, pCEp); + } + + return tlen; +} + +static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) { + buf = taosDecodeStringTo(buf, pSub->key); + buf = taosDecodeFixedI32(buf, &pSub->epoch); + + int32_t sz; + + buf = taosDecodeFixedI32(buf, &sz); + pSub->assigned = taosArrayInit(sz, sizeof(int64_t)); + if (pSub->assigned == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + int64_t consumerId; + buf = taosDecodeFixedI64(buf, &consumerId); + taosArrayPush(pSub->assigned, &consumerId); + } + + buf = taosDecodeFixedI32(buf, &sz); + pSub->unassignedConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp)); + if (pSub->unassignedConsumer == NULL) { + taosArrayDestroy(pSub->assigned); + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp cEp; + buf = tDecodeSMqConsumerEp(buf, &cEp); + taosArrayPush(pSub->unassignedConsumer, &cEp); + } + + buf = taosDecodeFixedI32(buf, &sz); + pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); + if (pSub->unassignedVg == NULL) { + taosArrayDestroy(pSub->assigned); + taosArrayDestroy(pSub->unassignedConsumer); + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp cEp; + buf = tDecodeSMqConsumerEp(buf, &cEp); + taosArrayPush(pSub->unassignedVg, &cEp); + } + + return buf; +} typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; @@ -358,8 +497,8 @@ typedef struct SMqTopicObj { char *sql; char *logicalPlan; char *physicalPlan; - SHashObj *cgroups; // SHashObj - SHashObj *consumers; // SHashObj + //SHashObj *cgroups; // SHashObj + //SHashObj *consumers; // SHashObj } SMqTopicObj; // TODO: add cache and change name to id @@ -367,18 +506,93 @@ typedef struct SMqConsumerTopic { char name[TSDB_TOPIC_FNAME_LEN]; int32_t epoch; //TODO: replace with something with ep - SList *vgroups; // SList + //SList *vgroups; // SList + //vg assigned to the consumer on the topic SArray *pVgInfo; // SArray } SMqConsumerTopic; +static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, SMqSubscribeObj* pSub) { + SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic)); + if (pCTopic == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + strcpy(pCTopic->name, pTopic->name); + pCTopic->epoch = 0; + pCTopic->pVgInfo = taosArrayInit(0, sizeof(int32_t)); + + int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg); + if (unassignedVgSz > 0) { + SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg); + pCEp->consumerId = consumerId; + taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId); + taosArrayPush(pSub->assigned, pCEp); + } + return pCTopic; +} + +static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic* pConsumerTopic) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pConsumerTopic->name); + tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); + int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i); + tlen += taosEncodeFixedI32(buf, *pVgInfo); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* pConsumerTopic) { + buf = taosDecodeStringTo(buf, pConsumerTopic->name); + buf = taosDecodeFixedI32(buf, &pConsumerTopic->epoch); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pConsumerTopic->pVgInfo = taosArrayInit(sz, sizeof(SMqConsumerTopic)); + for (int32_t i = 0; i < sz; i++) { + int32_t vgInfo; + buf = taosDecodeFixedI32(buf, &vgInfo); + taosArrayPush(pConsumerTopic->pVgInfo, &vgInfo); + } + return buf; +} + typedef struct SMqConsumerObj { int64_t consumerId; SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray *topics; // SArray - SHashObj *topicHash; //SHashObj + //SHashObj *topicHash; //SHashObj } SMqConsumerObj; +static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); + tlen += taosEncodeString(buf, pConsumer->cgroup); + int32_t sz = taosArrayGetSize(pConsumer->topics); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerTopic* pConsumerTopic = taosArrayGet(pConsumer->topics, i); + tlen += tEncodeSMqConsumerTopic(buf, pConsumerTopic); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) { + buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); + buf = taosDecodeStringTo(buf, pConsumer->cgroup); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj)); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerTopic cTopic; + buf = tDecodeSMqConsumerTopic(buf, &cTopic); + taosArrayPush(pConsumer->topics, &cTopic); + } + return buf; +} + typedef struct SMqSubConsumerObj { int64_t consumerUid; // if -1, unassigned SList *vgId; // SList diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h new file mode 100644 index 0000000000..b8e651e386 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_SUBSCRIBE_H_ +#define _TD_MND_SUBSCRIBE_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitSubscribe(SMnode *pMnode); +void mndCleanupSubscribe(SMnode *pMnode); + +SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName); +void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); + +SSdbRaw *mndSubscribeActionEncode(SMqSubscribeObj *pSub); +SSdbRow *mndSubscribeActionDecode(SSdbRaw *pRaw); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_SUBSCRIBE_H_*/ diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index d27bf53a90..5cdd8e77bd 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -30,24 +30,14 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 -static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); -static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); -static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); - int32_t mndInitConsumer(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_CONSUMER, .keyType = SDB_KEY_BINARY, @@ -57,26 +47,29 @@ int32_t mndInitConsumer(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndConsumerActionUpdate, .deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; - mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ - mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); return sdbSetTable(pMnode->pSdb, table); } void mndCleanupConsumer(SMnode *pMnode) {} -static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) { - return 0; -} - -static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { - int32_t size = sizeof(SMqConsumerObj) + MND_CONSUMER_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); +SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); + SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen); if (pRaw == NULL) goto CM_ENCODE_OVER; + void* buf = malloc(tlen); + if (buf == NULL) goto CM_ENCODE_OVER; + + void* abuf = buf; + tEncodeSMqConsumerObj(&abuf, pConsumer); + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER); + +#if 0 int32_t topicNum = taosArrayGetSize(pConsumer->topics); SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER); int32_t len = strlen(pConsumer->cgroup); @@ -101,10 +94,13 @@ static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { /*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/ } } +#endif SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER); + terrno = TSDB_CODE_SUCCESS; + CM_ENCODE_OVER: if (terrno != 0) { mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); @@ -116,7 +112,7 @@ CM_ENCODE_OVER: return pRaw; } -static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { +SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; @@ -127,18 +123,27 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { goto CONSUME_DECODE_OVER; } - int32_t size = sizeof(SMqConsumerObj); - SSdbRow *pRow = sdbAllocRow(size); + SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj)); if (pRow == NULL) goto CONSUME_DECODE_OVER; SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); if (pConsumer == NULL) goto CONSUME_DECODE_OVER; int32_t dataPos = 0; - SDB_GET_INT64(pRaw, dataPos, &pConsumer->consumerId, CONSUME_DECODE_OVER); - int32_t len, topicNum; + int32_t len; SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CONSUME_DECODE_OVER); + void* buf = malloc(len); + if (buf == NULL) goto CONSUME_DECODE_OVER; + + SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER); + + tDecodeSMqConsumerObj(buf, pConsumer); + + SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + +#if 0 SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER); for (int i = 0; i < topicNum; i++) { int32_t topicLen; @@ -154,6 +159,7 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { int32_t vgSize; SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER); } +#endif CONSUME_DECODE_OVER: if (terrno != 0) { @@ -162,8 +168,6 @@ CONSUME_DECODE_OVER: return NULL; } - /*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/ - return pRow; } @@ -201,214 +205,13 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { sdbRelease(pSdb, pConsumer); } -static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - char *msgStr = pMsg->rpcMsg.pCont; - SCMSubscribeReq subscribe; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); - int64_t consumerId = subscribe.consumerId; - char *consumerGroup = subscribe.consumerGroup; - int32_t cgroupLen = strlen(consumerGroup); - - SArray *newSub = NULL; - int newTopicNum = subscribe.topicNum; - if (newTopicNum) { - newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); - } - SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic)); - if (pConsumerTopics == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - for (int i = 0; i < newTopicNum; i++) { - char *newTopicName = taosArrayGetP(newSub, i); - SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i]; - - strcpy(pConsumerTopic->name, newTopicName); - pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); - } - - taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum); - free(pConsumerTopics); - taosArraySortString(newSub, taosArrayCompareString); - - SArray *oldSub = NULL; - int oldTopicNum = 0; - // create consumer if not exist - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); - if (pConsumer == NULL) { - // create consumer - pConsumer = malloc(sizeof(SMqConsumerObj)); - if (pConsumer == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pConsumer->consumerId = consumerId; - strcpy(pConsumer->cgroup, consumerGroup); - - } else { - oldSub = pConsumer->topics; - oldTopicNum = taosArrayGetSize(oldSub); - } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - //TODO: free memory - return -1; - } - - int i = 0, j = 0; - while (i < newTopicNum || j < oldTopicNum) { - SMqConsumerTopic *pOldTopic = NULL; - SMqConsumerTopic *pNewTopic = NULL; - if (i >= newTopicNum) { - // encode unset topic msg to all vnodes related to that topic - pOldTopic = taosArrayGet(oldSub, j); - j++; - } else if (j >= oldTopicNum) { - pNewTopic = taosArrayGet(newSub, i); - i++; - } else { - pNewTopic = taosArrayGet(newSub, i); - pOldTopic = taosArrayGet(oldSub, j); - - char *newName = pNewTopic->name; - char *oldName = pOldTopic->name; - int comp = compareLenPrefixedStr(newName, oldName); - if (comp == 0) { - // do nothing - pOldTopic = pNewTopic = NULL; - i++; - j++; - continue; - } else if (comp < 0) { - pOldTopic = NULL; - i++; - } else { - pNewTopic = NULL; - j++; - } - } - - if (pOldTopic != NULL) { - //cancel subscribe of that old topic - ASSERT(pNewTopic == NULL); - char *oldTopicName = pOldTopic->name; - SList *vgroups = pOldTopic->vgroups; - SListIter iter; - tdListInitIter(vgroups, &iter, TD_LIST_FORWARD); - SListNode *pn; - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); - ASSERT(pTopic != NULL); - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); - while ((pn = tdListNext(&iter)) != NULL) { - int32_t vgId = *(int64_t *)pn->data; - // acquire and get epset - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - // TODO what time to release? - if (pVgObj == NULL) { - // TODO handle error - continue; - } - //build reset msg - void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); - // TODO:serialize - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = pMqVgSetReq; - action.contLen = 0; // TODO - action.msgType = TDMT_VND_MQ_SET_CONN; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMqVgSetReq); - mndTransDrop(pTrans); - // TODO free - return -1; - } - } - //delete data in mnode - taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); - mndReleaseTopic(pMnode, pTopic); - - } else if (pNewTopic != NULL) { - // save subscribe info to mnode - ASSERT(pOldTopic == NULL); - - char *newTopicName = pNewTopic->name; - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); - ASSERT(pTopic != NULL); - - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); - if (pGroup == NULL) { - // add new group - pGroup = malloc(sizeof(SMqCGroup)); - if (pGroup == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pGroup->consumerIds = tdListNew(sizeof(int64_t)); - if (pGroup->consumerIds == NULL) { - free(pGroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pGroup->status = 0; - // add into cgroups - taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); - } - /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ - - // put the consumer into list - // rebalance will be triggered by timer - tdListAppend(pGroup->consumerIds, &consumerId); - - SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); - sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); - // TODO: error handling - mndTransAppendRedolog(pTrans, pTopicRaw); - - mndReleaseTopic(pMnode, pTopic); - - } else { - ASSERT(0); - } - } - // destroy old sub - taosArrayDestroy(oldSub); - // put new sub into consumerobj - pConsumer->topics = newSub; - - // persist consumerObj - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); - sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - // TODO: error handling - mndTransAppendRedolog(pTrans, pConsumerRaw); - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - mndReleaseConsumer(pMnode, pConsumer); - return -1; - } - - // TODO: free memory - mndTransDrop(pTrans); - mndReleaseConsumer(pMnode, pConsumer); - return 0; -} - -static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } - +#if 0 static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; STableInfoReq *pInfo = pMsg->rpcMsg.pCont; mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname); -#if 0 SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -463,7 +266,6 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { pMsg->contLen = contLen; mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); -#endif return 0; } @@ -546,3 +348,4 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } +#endif diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 3773750ed3..22fdfde2ac 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -273,6 +273,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { } static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { +#if 0 SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp)); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -332,6 +333,8 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { pRsp->body = buf; pRsp->bodyLen = tlen; return pRsp; +#endif + return NULL; } static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c new file mode 100644 index 0000000000..1aca2e9894 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -0,0 +1,597 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndConsumer.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndSubscribe.h" +#include "mndTopic.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tcompare.h" +#include "tname.h" + +#define MND_SUBSCRIBE_VER_NUMBER 1 +#define MND_SUBSCRIBE_RESERVE_SIZE 64 + +static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); +static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); +static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); +static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); +static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); + +int32_t mndInitSubscribe(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_SUBSCRIBE, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndSubActionEncode, + .decodeFp = (SdbDecodeFp)mndSubActionDecode, + .insertFp = (SdbInsertFp)mndSubActionInsert, + .updateFp = (SdbUpdateFp)mndSubActionUpdate, + .deleteFp = (SdbDeleteFp)mndSubActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); + /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ + /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ + mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + return sdbSetTable(pMnode->pSdb, table); +} + +static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { + SMqConsumerEp CEp; + CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; + int32_t sz; + SVgObj *pVgroup = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup); + while (pIter != NULL) { + if (pVgroup->dbUid == pTopic->dbUid) { + CEp.epset = mndGetVgroupEpset(pMnode, pVgroup); + CEp.vgId = pVgroup->vgId; + taosArrayPush(unassignedVg, &CEp); + } + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + } + return 0; +} + +static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, + SMqConsumerTopic *pConsumerTopic) { + int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + for (int32_t i = 0; i < sz; i++) { + int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + SMqSetCVgReq req = { + .vgId = vgId, + .consumerId = pConsumer->consumerId, + }; + strcpy(req.cGroup, pConsumer->cgroup); + strcpy(req.topicName, pConsumerTopic->name); + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *reqStr = malloc(tlen); + if (reqStr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + void *abuf = reqStr; + tEncodeSMqSetCVgReq(abuf, &req); + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = reqStr; + action.contLen = tlen; + action.msgType = TDMT_VND_MQ_SET_CONN; + + mndReleaseVgroup(pMnode, pVgObj); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(reqStr); + return -1; + } + } + return 0; +} + +void mndCleanupSubscribe(SMnode *pMnode) {} + +static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { + int32_t tlen = tEncodeSubscribeObj(NULL, pSub); + int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE; + + SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size); + if (pRaw == NULL) goto SUB_ENCODE_OVER; + + void *buf = malloc(tlen); + if (buf == NULL) { + goto SUB_ENCODE_OVER; + } + void *abuf = buf; + + tEncodeSubscribeObj(&buf, pSub); + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER); + SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER); + +SUB_ENCODE_OVER: + if (terrno != 0) { + mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub); + return pRaw; +} + +static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; + + if (sver != MND_SUBSCRIBE_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto SUB_DECODE_OVER; + } + + int32_t size = sizeof(SMqSubscribeObj); + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto SUB_DECODE_OVER; + + SMqSubscribeObj *pSub = sdbGetRowObj(pRow); + if (pSub == NULL) goto SUB_DECODE_OVER; + + int32_t dataPos = 0; + int32_t tlen; + void *buf = malloc(tlen + 1); + if (buf == NULL) goto SUB_DECODE_OVER; + SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); + SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); + + if (tDecodeSubscribeObj(buf, pSub) == NULL) { + goto SUB_DECODE_OVER; + } + +SUB_DECODE_OVER: + if (terrno != 0) { + mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); + // TODO free subscribeobj + tfree(pRow); + return NULL; + } + + return pRow; +} + +static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) { + mTrace("subscribe:%s, perform insert action", pSub->key); + return 0; +} + +static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) { + mTrace("subscribe:%s, perform delete action", pSub->key); + return 0; +} + +static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) { + mTrace("subscribe:%s, perform update action", pOldSub->key); + return 0; +} + +static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) { + return 0; +} + +static char *mndMakeSubscribeKey(char *cgroup, char *topicName) { + char *key = malloc(TSDB_SHOW_SUBQUERY_LEN); + if (key == NULL) { + return NULL; + } + int tlen = strlen(cgroup); + memcpy(key, cgroup, tlen); + key[tlen] = ':'; + strcpy(key + tlen + 1, topicName); + return key; +} + +SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) { + SSdb *pSdb = pMnode->pSdb; + char *key = mndMakeSubscribeKey(cgroup, topicName); + SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); + free(key); + if (pSub == NULL) { + /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + } + return pSub; +} + +void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pSub); +} + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMSubscribeReq subscribe; + tDeserializeSCMSubscribeReq(msgStr, &subscribe); + int64_t consumerId = subscribe.consumerId; + char *consumerGroup = subscribe.consumerGroup; + int32_t cgroupLen = strlen(consumerGroup); + + SArray *newSub = subscribe.topicNames; + int newTopicNum = subscribe.topicNum; + + taosArraySortString(newSub, taosArrayCompareString); + + SArray *oldSub = NULL; + int oldTopicNum = 0; + // create consumer if not exist + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); + if (pConsumer == NULL) { + // create consumer + pConsumer = malloc(sizeof(SMqConsumerObj)); + if (pConsumer == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pConsumer->consumerId = consumerId; + strcpy(pConsumer->cgroup, consumerGroup); + taosInitRWLatch(&pConsumer->lock); + } else { + oldSub = pConsumer->topics; + } + pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); + + if (oldSub != NULL) { + oldTopicNum = taosArrayGetSize(oldSub); + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + if (pTrans == NULL) { + // TODO: free memory + return -1; + } + + int i = 0, j = 0; + while (i < newTopicNum || j < oldTopicNum) { + char* newTopicName = NULL; + char* oldTopicName = NULL; + if (i >= newTopicNum) { + // encode unset topic msg to all vnodes related to that topic + oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name; + j++; + } else if (j >= oldTopicNum) { + newTopicName = taosArrayGet(newSub, i); + i++; + } else { + newTopicName = taosArrayGet(newSub, i); + oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name; + + int comp = compareLenPrefixedStr(newTopicName, oldTopicName); + if (comp == 0) { + // do nothing + oldTopicName = newTopicName = NULL; + i++; + j++; + continue; + } else if (comp < 0) { + oldTopicName = NULL; + i++; + } else { + newTopicName = NULL; + j++; + } + } + + if (oldTopicName != NULL) { +#if 0 + // cancel subscribe of that old topic + ASSERT(pNewTopic == NULL); + char *oldTopicName = pOldTopic->name; + SList *vgroups = pOldTopic->vgroups; + SListIter iter; + tdListInitIter(vgroups, &iter, TD_LIST_FORWARD); + SListNode *pn; + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); + ASSERT(pTopic != NULL); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName); + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); + while ((pn = tdListNext(&iter)) != NULL) { + int32_t vgId = *(int64_t *)pn->data; + // acquire and get epset + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + // TODO what time to release? + if (pVgObj == NULL) { + // TODO handle error + continue; + } + // build reset msg + void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); + // TODO:serialize + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = pMqVgSetReq; + action.contLen = 0; // TODO + action.msgType = TDMT_VND_MQ_SET_CONN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMqVgSetReq); + mndTransDrop(pTrans); + // TODO free + return -1; + } + } + // delete data in mnode + taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); + mndReleaseSubscribe(pMnode, pSub); + mndReleaseTopic(pMnode, pTopic); +#endif + } else if (newTopicName != NULL) { + // save subscribe info to mnode + ASSERT(oldTopicName == NULL); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); + if (pTopic == NULL) { + /*terrno = */ + continue; + } + + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); + if (pSub == NULL) { + pSub = tNewSubscribeObj(); + if (pSub == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + // set unassigned vg + mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); + } + SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); + taosArrayPush(pConsumer->topics, pConsumerTopic); + + if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { + int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); + // send setmsg to vnode + if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic) < 0) { + // TODO + return -1; + } + } + taosArrayDestroy(pConsumerTopic->pVgInfo); + free(pConsumerTopic); +#if 0 + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); + if (pGroup == NULL) { + // add new group + pGroup = malloc(sizeof(SMqCGroup)); + if (pGroup == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pGroup->consumerIds = tdListNew(sizeof(int64_t)); + if (pGroup->consumerIds == NULL) { + free(pGroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pGroup->status = 0; + // add into cgroups + taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); + } + /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ + + // put the consumer into list + // rebalance will be triggered by timer + tdListAppend(pGroup->consumerIds, &consumerId); + + SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); + sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); + // TODO: error handling + mndTransAppendRedolog(pTrans, pTopicRaw); + +#endif + mndReleaseTopic(pMnode, pTopic); + mndReleaseSubscribe(pMnode, pSub); + } + } + // part3. persist consumerObj + + // destroy old sub + if (oldSub) taosArrayDestroy(oldSub); + // put new sub into consumerobj + + // persist consumerObj + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); + sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); + // TODO: error handling + mndTransAppendRedolog(pTrans, pConsumerRaw); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); + return -1; + } + + // TODO: free memory + if (newSub) taosArrayDestroy(newSub); + mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); + return 0; +} + +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + STableInfoReq *pInfo = pMsg->rpcMsg.pCont; + + mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname); + +#if 0 + SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname); + if (pConsumer == NULL) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_MND_INVALID_CONSUMER; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + taosRLockLatch(&pConsumer->lock); + int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags; + int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); + + STableMetaRsp *pMeta = rpcMallocCont(contLen); + if (pMeta == NULL) { + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN); + pMeta->numOfTags = htonl(pConsumer->numOfTags); + pMeta->numOfColumns = htonl(pConsumer->numOfColumns); + pMeta->precision = pDb->cfg.precision; + pMeta->tableType = TSDB_SUPER_TABLE; + pMeta->update = pDb->cfg.update; + pMeta->sversion = htonl(pConsumer->version); + pMeta->tuid = htonl(pConsumer->uid); + + for (int32_t i = 0; i < totalCols; ++i) { + SSchema *pSchema = &pMeta->pSchema[i]; + SSchema *pSrcSchema = &pConsumer->pSchema[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + + pMsg->pCont = pMeta; + pMsg->contLen = contLen; + + mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); +#endif + return 0; +} + +static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) { + SSdb *pSdb = pMnode->pSdb; + + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfConsumers = 0; + void *pIter = NULL; + while (1) { + SMqConsumerObj *pConsumer = NULL; + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) break; + + numOfConsumers++; + + sdbRelease(pSdb, pConsumer); + } + + *pNumOfConsumers = numOfConsumers; + return 0; +} + +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) { + return -1; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchema; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tags"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htonl(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbFname, mndShowStr(pShow->type)); + + return 0; +} + +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 16a9828e71..1d4cbf37ce 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -79,8 +79,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER); int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; - pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char)); - if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER; SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); @@ -92,12 +90,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { TOPIC_ENCODE_OVER: if (terrno != TSDB_CODE_SUCCESS) { mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); - /*if (pTopic->logicalPlan) {*/ - /*free(pTopic->logicalPlan);*/ - /*}*/ - /*if (pTopic->physicalPlan) {*/ - /*free(pTopic->physicalPlan);*/ - /*}*/ sdbFreeRaw(pRaw); return NULL; } @@ -138,7 +130,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - pTopic->logicalPlan = calloc(len+1, sizeof(char)); + pTopic->logicalPlan = calloc(len + 1, sizeof(char)); if (pTopic->logicalPlan == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto TOPIC_DECODE_OVER; @@ -146,7 +138,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - pTopic->logicalPlan = calloc(len + 1, sizeof(char)); + pTopic->physicalPlan = calloc(len + 1, sizeof(char)); if (pTopic->physicalPlan == NULL) { free(pTopic->logicalPlan); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -154,7 +146,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER); - SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); terrno = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 8fdb6b1657..a9267b0ea3 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -38,10 +38,10 @@ const char *sdbTableName(ESdbType type) { return "auth"; case SDB_ACCT: return "acct"; + case SDB_SUBSCRIBE: + return "subscribe"; case SDB_CONSUMER: return "consumer"; - case SDB_CGROUP: - return "cgroup"; case SDB_TOPIC: return "topic"; case SDB_VGROUP: -- GitLab