diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 65860d4959928aba2a28a620789351a0bef670cf..48827d1212338607d3f2c0ff438187ff45dcd78d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -746,7 +746,7 @@ typedef struct { int32_t fsyncPeriod; uint32_t hashBegin; uint32_t hashEnd; - int8_t hashMethod; + int8_t hashMethod; int8_t walLevel; int8_t precision; int8_t compression; @@ -757,7 +757,7 @@ typedef struct { int8_t selfIndex; int8_t streamMode; SReplica replicas[TSDB_MAX_REPLICA]; - + } SCreateVnodeReq, SAlterVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); @@ -1388,7 +1388,7 @@ typedef struct { typedef struct SMqCMGetSubEpReq { int64_t consumerId; int32_t epoch; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; } SMqCMGetSubEpReq; static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { @@ -1688,7 +1688,7 @@ typedef struct { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; char* sql; char* logicalPlan; char* physicalPlan; @@ -1751,7 +1751,7 @@ typedef struct { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; } SMqSetCVgRsp; typedef struct { @@ -1759,14 +1759,14 @@ typedef struct { int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; } SMqMVRebRsp; typedef struct { int32_t vgId; int64_t offset; char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; } SMqOffset; typedef struct { @@ -2080,7 +2080,7 @@ typedef struct { int64_t consumerId; int64_t blockingTime; int32_t epoch; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; int64_t currentOffset; char topic[TSDB_TOPIC_FNAME_LEN]; @@ -2099,7 +2099,7 @@ typedef struct { typedef struct { int64_t consumerId; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; diff --git a/include/util/tdef.h b/include/util/tdef.h index 9695c2e4c86de7440da41d5b5470656b947d484f..664588f68ff4973e14676924a6ba5647f262a43c 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -195,6 +195,7 @@ typedef enum ELogicConditionType { #define TSDB_NODE_NAME_LEN 64 #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string +#define TSDB_CGROUP_LEN 193 // it is a null-terminated string #define TSDB_DB_NAME_LEN 65 #define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN) @@ -210,9 +211,8 @@ typedef enum ELogicConditionType { #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN -#define TSDB_CONSUMER_GROUP_LEN 192 -#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) -#define TSDB_PARTITION_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) +#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) +#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20) #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE @@ -428,6 +428,8 @@ typedef struct { int32_t primary; } SDiskCfg; +#define TMQ_SEPARATOR ':' + #ifdef __cplusplus } #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 35fc557eea310ae82135dc18df0df4033addcccc..10dc378518704001feebfa70ef368fff99c8b26d 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -46,7 +46,7 @@ struct tmq_topic_vgroup_list_t { struct tmq_conf_t { char clientId[256]; - char groupId[256]; + char groupId[TSDB_CGROUP_LEN]; int8_t auto_commit; int8_t resetOffset; tmq_commit_cb* commit_cb; @@ -56,7 +56,7 @@ struct tmq_conf_t { struct tmq_t { // conf - char groupId[256]; + char groupId[TSDB_CGROUP_LEN]; char clientId[256]; int8_t autoCommit; int8_t inWaiting; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 05f7de60ff948b708ab8a34215e255aa038ac2f6..a36168b622dcc124a4c47105be5d0877330c0377 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -594,7 +594,7 @@ typedef struct { int64_t consumerId; int64_t connId; SRWLatch lock; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char cgroup[TSDB_CGROUP_LEN]; SArray* currentTopics; // SArray SArray* recentRemovedTopics; // SArray int32_t epoch; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 9b301ad98a3908af6c82f92759722dabc08ab0aa..5308c0c0f67cec64f6dbef68cdc9fd94828a8467 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -43,7 +43,12 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib return -1; } - SArray* inner = taosArrayGet(pDag->pSubplans, 0); + SArray* inner = taosArrayGet(pDag->pSubplans, 0); + + int32_t opNum = taosArrayGetSize(inner); + if (opNum != 1) { + return -1; + } SSubplan* plan = taosArrayGetP(inner, 0); void* pIter = NULL; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 84360ab23b58bfa20c729e8f404465332e8b2d56..9acd881897de0b41d369cb71b7c14b8ff894c578 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -40,7 +40,7 @@ enum { MQ_SUBSCRIBE_STATUS__DELETED, }; -static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName); +static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); @@ -88,15 +88,9 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char *key = mndMakeSubscribeKey(cgroup, pTopic->name); - if (key == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tDeleteSMqSubscribeObj(pSub); - free(pSub); - return NULL; - } + char key[TSDB_SUBSCRIBE_KEY_LEN]; + mndMakeSubscribeKey(key, cgroup, pTopic->name); strcpy(pSub->key, key); - free(key); if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; @@ -335,15 +329,14 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { return 0; } -static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { +static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) { int32_t i = 0; - while (key[i] != ':') { + while (key[i] != TMQ_SEPARATOR) { i++; } - key[i] = 0; - *cgroup = strdup(key); - key[i] = ':'; - *topic = strdup(&key[i + 1]); + memcpy(topic, key, i - 1); + topic[i] = 0; + strcpy(cgroup, &key[i + 1]); return 0; } @@ -379,8 +372,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { // get all topics of that topic int32_t sz = taosArrayGetSize(pConsumer->currentTopics); for (int32_t i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pConsumer->currentTopics, i); - char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + char key[TSDB_SUBSCRIBE_KEY_LEN]; + mndMakeSubscribeKey(key, pConsumer->cgroup, topic); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId); } @@ -396,8 +390,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } int32_t sz = taosArrayGetSize(rebSubs); for (int32_t i = 0; i < sz; i++) { - char *topic = taosArrayGetP(rebSubs, i); - char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); + char *topic = taosArrayGetP(rebSubs, i); + char key[TSDB_SUBSCRIBE_KEY_LEN]; + mndMakeSubscribeKey(key, pConsumer->cgroup, topic); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); if (status == MQ_CONSUMER_STATUS__INIT) { taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); @@ -530,9 +525,9 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); if (pConsumerEp->oldConsumerId == -1) { - char *topic; - char *cgroup; - mndSplitSubscribeKey(pSub->key, &topic, &cgroup); + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic, @@ -540,8 +535,6 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndReleaseTopic(pMnode, pTopic); - free(topic); - free(cgroup); } else { mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); @@ -769,6 +762,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { } #endif +#if 0 static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; @@ -814,6 +808,7 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM /*qDestroyQueryDag(pDag);*/ return 0; } +#endif static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp) { @@ -959,23 +954,19 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc return 0; } -static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) { - char *key = malloc(TSDB_SHOW_SUBQUERY_LEN); - if (key == NULL) { - return NULL; - } +static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) { int32_t tlen = strlen(cgroup); memcpy(key, cgroup, tlen); - key[tlen] = ':'; + key[tlen] = TMQ_SEPARATOR; strcpy(key + tlen + 1, topicName); - return key; + return 0; } SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) { - SSdb *pSdb = pMnode->pSdb; - char *key = mndMakeSubscribeKey(cgroup, topicName); + SSdb *pSdb = pMnode->pSdb; + char key[TSDB_SUBSCRIBE_KEY_LEN]; + mndMakeSubscribeKey(key, cgroup, topicName); SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); - free(key); if (pSub == NULL) { terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6cf6eeb37137ba8ab766339d7f2b0bd4bcd75381..c1cb4f8a410665fae27b4a24848c4de8ff3a7f30 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -267,7 +267,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill") // mnode-topic -TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported") // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")