未验证 提交 128b6138 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10595 from taosdata/feature/tq

subscribe key used stack mem
......@@ -746,7 +746,7 @@ typedef struct {
int32_t fsyncPeriod;
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
int8_t hashMethod;
int8_t walLevel;
int8_t precision;
int8_t compression;
......@@ -757,7 +757,7 @@ typedef struct {
int8_t selfIndex;
int8_t streamMode;
SReplica replicas[TSDB_MAX_REPLICA];
} SCreateVnodeReq, SAlterVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......@@ -1388,7 +1388,7 @@ typedef struct {
typedef struct SMqCMGetSubEpReq {
int64_t consumerId;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
} SMqCMGetSubEpReq;
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
......@@ -1688,7 +1688,7 @@ typedef struct {
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
......@@ -1751,7 +1751,7 @@ typedef struct {
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
} SMqSetCVgRsp;
typedef struct {
......@@ -1759,14 +1759,14 @@ typedef struct {
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
} SMqMVRebRsp;
typedef struct {
int32_t vgId;
int64_t offset;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
} SMqOffset;
typedef struct {
......@@ -2080,7 +2080,7 @@ typedef struct {
int64_t consumerId;
int64_t blockingTime;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
int64_t currentOffset;
char topic[TSDB_TOPIC_FNAME_LEN];
......@@ -2099,7 +2099,7 @@ typedef struct {
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
......
......@@ -195,6 +195,7 @@ typedef enum ELogicConditionType {
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
......@@ -210,9 +211,8 @@ typedef enum ELogicConditionType {
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_PARTITION_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
......@@ -428,6 +428,8 @@ typedef struct {
int32_t primary;
} SDiskCfg;
#define TMQ_SEPARATOR ':'
#ifdef __cplusplus
}
#endif
......
......@@ -46,7 +46,7 @@ struct tmq_topic_vgroup_list_t {
struct tmq_conf_t {
char clientId[256];
char groupId[256];
char groupId[TSDB_CGROUP_LEN];
int8_t auto_commit;
int8_t resetOffset;
tmq_commit_cb* commit_cb;
......@@ -56,7 +56,7 @@ struct tmq_conf_t {
struct tmq_t {
// conf
char groupId[256];
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t autoCommit;
int8_t inWaiting;
......
......@@ -594,7 +594,7 @@ typedef struct {
int64_t consumerId;
int64_t connId;
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char cgroup[TSDB_CGROUP_LEN];
SArray* currentTopics; // SArray<char*>
SArray* recentRemovedTopics; // SArray<char*>
int32_t epoch;
......
......@@ -43,7 +43,12 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return -1;
}
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
int32_t opNum = taosArrayGetSize(inner);
if (opNum != 1) {
return -1;
}
SSubplan* plan = taosArrayGetP(inner, 0);
void* pIter = NULL;
......
......@@ -40,7 +40,7 @@ enum {
MQ_SUBSCRIBE_STATUS__DELETED,
};
static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName);
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
......@@ -88,15 +88,9 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
char *key = mndMakeSubscribeKey(cgroup, pTopic->name);
if (key == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSMqSubscribeObj(pSub);
free(pSub);
return NULL;
}
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, cgroup, pTopic->name);
strcpy(pSub->key, key);
free(key);
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
......@@ -335,15 +329,14 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
int32_t i = 0;
while (key[i] != ':') {
while (key[i] != TMQ_SEPARATOR) {
i++;
}
key[i] = 0;
*cgroup = strdup(key);
key[i] = ':';
*topic = strdup(&key[i + 1]);
memcpy(topic, key, i - 1);
topic[i] = 0;
strcpy(cgroup, &key[i + 1]);
return 0;
}
......@@ -379,8 +372,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// get all topics of that topic
int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
}
......@@ -396,8 +390,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
}
int32_t sz = taosArrayGetSize(rebSubs);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(rebSubs, i);
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
char *topic = taosArrayGetP(rebSubs, i);
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
if (status == MQ_CONSUMER_STATUS__INIT) {
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
......@@ -530,9 +525,9 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
if (pConsumerEp->oldConsumerId == -1) {
char *topic;
char *cgroup;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pSub->key, topic, cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic,
......@@ -540,8 +535,6 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
mndReleaseTopic(pMnode, pTopic);
free(topic);
free(cgroup);
} else {
mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId,
pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
......@@ -769,6 +762,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
}
#endif
#if 0
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
......@@ -814,6 +808,7 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
/*qDestroyQueryDag(pDag);*/
return 0;
}
#endif
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
const SMqConsumerEp *pConsumerEp) {
......@@ -959,23 +954,19 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
return 0;
}
static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
if (key == NULL) {
return NULL;
}
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
int32_t tlen = strlen(cgroup);
memcpy(key, cgroup, tlen);
key[tlen] = ':';
key[tlen] = TMQ_SEPARATOR;
strcpy(key + tlen + 1, topicName);
return key;
return 0;
}
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
SSdb *pSdb = pMnode->pSdb;
char *key = mndMakeSubscribeKey(cgroup, topicName);
SSdb *pSdb = pMnode->pSdb;
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, cgroup, topicName);
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
free(key);
if (pSub == NULL) {
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
}
......
......@@ -267,7 +267,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
// mnode-topic
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册