提交 0a83ecac 编写于 作者: L Liu Jicong

add sub structure

上级 7f80c1e9
......@@ -320,14 +320,23 @@ typedef struct SEpSet {
} SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
if(buf == NULL) return sizeof(SEpSet);
memcpy(buf, pEp, sizeof(SEpSet));
//TODO: endian conversion
return sizeof(SEpSet);
int tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
tlen += taosEncodeFixedI8(buf, pEp->numOfEps);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
tlen += taosEncodeFixedU16(buf, pEp->port[i]);
tlen += taosEncodeString(buf, pEp->fqdn[i]);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEpSet) {
memcpy(pEpSet, buf, sizeof(SEpSet));
static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->port[i]);
buf = taosDecodeStringTo(buf, pEp->fqdn[i]);
}
return buf;
}
......@@ -1088,16 +1097,16 @@ typedef struct STaskDropRsp {
} STaskDropRsp;
typedef struct {
int8_t igExists;
char* name;
char* physicalPlan;
char* logicalPlan;
int8_t igExists;
char* name;
char* physicalPlan;
char* logicalPlan;
} SCMCreateTopicReq;
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
int tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->logicalPlan);
return tlen;
......@@ -1127,41 +1136,62 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi
}
typedef struct {
char* topicName;
char* consumerGroup;
int32_t topicNum;
int64_t consumerId;
char* consumerGroup;
char* topicName[];
} SCMSubscribeReq;
static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
int tlen = 0;
tlen += taosEncodeString(buf, pReq->topicName);
tlen += taosEncodeString(buf, pReq->consumerGroup);
tlen += taosEncodeFixedI32(buf, pReq->topicNum);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->consumerGroup);
for(int i = 0; i < pReq->topicNum; i++) {
tlen += taosEncodeString(buf, pReq->topicName[i]);
}
return tlen;
}
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeString(buf, &pReq->topicName);
buf = taosDecodeString(buf, &pReq->consumerGroup);
buf = taosDecodeFixedI32(buf, &pReq->topicNum);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeString(buf, &pReq->consumerGroup);
for(int i = 0; i < pReq->topicNum; i++) {
buf = taosDecodeString(buf, &pReq->topicName[i]);
}
return buf;
}
typedef struct {
typedef struct SMqSubTopic {
int32_t vgId;
SEpSet pEpSet;
int64_t topicId;
SEpSet epSet;
} SMqSubTopic;
typedef struct {
int32_t topicNum;
SMqSubTopic topics[];
} SCMSubscribeRsp;
static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pRsp->vgId);
tlen += taosEncodeSEpSet(buf, &pRsp->pEpSet);
tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
for(int i = 0; i < pRsp->topicNum; i++) {
tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
}
return tlen;
}
static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
buf = taosDecodeFixedI32(buf, &pRsp->vgId);
buf = taosDecodeSEpSet(buf, &pRsp->pEpSet);
buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
for(int i = 0; i < pRsp->topicNum; i++) {
buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
}
return buf;
}
......@@ -1170,10 +1200,36 @@ typedef struct {
int64_t consumerId;
int64_t consumerGroupId;
int64_t offset;
char* sql;
char* logicalPlan;
char* physicalPlan;
} SMVSubscribeReq;
static FORCE_INLINE int tSerializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
int tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->topicId);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId);
tlen += taosEncodeFixedI64(buf, pReq->offset);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
return tlen;
}
static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->topicId);
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId);
buf = taosDecodeFixedI64(buf, &pReq->offset);
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
return buf;
}
typedef struct {
int64_t newOffset;
int64_t status;
} SMVSubscribeRsp;
typedef struct {
......
......@@ -177,6 +177,7 @@ do { \
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
#define TSDB_CONSUMER_GROUP_LEN 192
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
......
......@@ -22,6 +22,7 @@
#include "sync.h"
#include "tmsg.h"
#include "thash.h"
#include "tlist.h"
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
......@@ -288,46 +289,90 @@ typedef struct {
char payload[];
} SShowObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
typedef struct SConsumerObj {
uint64_t uid;
uint64_t dbUid;
int32_t version;
int64_t createTime;
int64_t updateTime;
//uint64_t dbUid;
int32_t version;
SRWLatch lock;
SList* topics;
} SConsumerObj;
typedef struct SMqSubConsumerObj {
int64_t consumerUid; // if -1, unassigned
SList* vgId; //SList<int32_t>
} SMqSubConsumerObj;
typedef struct SMqSubCGroupObj {
char name[TSDB_CONSUMER_GROUP_LEN];
SList* consumers; //SList<SMqConsumerObj>
} SMqSubCGroupObj;
typedef struct SMqSubTopicObj {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t execLen;
void* executor;
int32_t sqlLen;
char* sql;
char* logicalPlan;
char* physicalPlan;
} STopicObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
SList* cgroups; //SList<SMqSubCGroupObj>
} SMqSubTopicObj;
typedef struct SMqConsumerSubObj {
int64_t topicUid;
SList* vgIds; //SList<int64_t>
} SMqConsumerSubObj;
typedef struct SMqConsumerHbObj {
int64_t consumerId;
SList* consumerSubs; //SList<SMqConsumerSubObj>
} SMqConsumerHbObj;
typedef struct SMqVGroupSubObj {
int64_t topicUid;
SList* consumerIds; //SList<int64_t>
} SMqVGroupSubObj;
typedef struct SMqVGroupHbObj {
int64_t vgId;
SList* vgSubs; //SList<SMqVGroupSubObj>
} SMqVGroupHbObj;
typedef struct SCGroupObj {
char name[TSDB_TOPIC_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
//uint64_t dbUid;
int32_t version;
int32_t version;
SRWLatch lock;
} SConsumerObj;
SList* consumerIds;
} SCGroupObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
uint64_t uid;
//uint64_t dbUid;
int32_t version;
uint64_t dbUid;
int32_t version;
SRWLatch lock;
} SCGroupObj;
int32_t execLen;
void* executor;
int32_t sqlLen;
char* sql;
char* logicalPlan;
char* physicalPlan;
SList* consumerIds;
} STopicObj;
typedef struct SMnodeMsg {
char user[TSDB_USER_LEN];
......
......@@ -66,17 +66,82 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
static SSdbRaw *mndCGroupActionEncode(SCGroupObj *pCGroup) {
int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN);
SDB_SET_INT64(pRaw, dataPos, pCGroup->createTime);
SDB_SET_INT64(pRaw, dataPos, pCGroup->updateTime);
SDB_SET_INT64(pRaw, dataPos, pCGroup->uid);
/*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/
SDB_SET_INT32(pRaw, dataPos, pCGroup->version);
int32_t sz = listNEles(pCGroup->consumerIds);
SDB_SET_INT32(pRaw, dataPos, sz);
SListIter iter;
tdListInitIter(pCGroup->consumerIds, &iter, TD_LIST_FORWARD);
SListNode *pn = NULL;
while ((pn = tdListNext(&iter)) != NULL) {
int64_t consumerId = *(int64_t *)pn->data;
SDB_SET_INT64(pRaw, dataPos, consumerId);
}
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
}
static SSdbRow *mndCGroupActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != MND_CONSUMER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode cgroup since %s", terrstr());
return NULL;
}
// TODO: maximum size is not known
int32_t size = sizeof(SCGroupObj) + 128 * sizeof(int64_t);
SSdbRow *pRow = sdbAllocRow(size);
SCGroupObj *pCGroup = sdbGetRowObj(pRow);
if (pCGroup == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pCGroup->name, TSDB_TABLE_FNAME_LEN);
SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->createTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->updateTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pCGroup->uid);
/*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/
SDB_GET_INT32(pRaw, pRow, dataPos, &pCGroup->version);
int32_t sz;
SDB_GET_INT32(pRaw, pRow, dataPos, &sz);
// TODO: free list when failing
tdListInit(pCGroup->consumerIds, sizeof(int64_t));
for (int i = 0; i < sz; i++) {
int64_t consumerId;
SDB_GET_INT64(pRaw, pRow, dataPos, &consumerId);
tdListAppend(pCGroup->consumerIds, &consumerId);
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE);
return pRow;
}
static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) {
int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN);
SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN);
SDB_SET_INT64(pRaw, dataPos, pConsumer->uid);
SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime);
SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime);
SDB_SET_INT64(pRaw, dataPos, pConsumer->uid);
/*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/
SDB_SET_INT32(pRaw, dataPos, pConsumer->version);
......@@ -102,11 +167,9 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
if (pConsumer == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN);
SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime);
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid);
/*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/
SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version);
......@@ -116,17 +179,17 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
}
static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) {
mTrace("consumer:%s, perform insert action", pConsumer->name);
mTrace("consumer:%ld, perform insert action", pConsumer->uid);
return 0;
}
static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) {
mTrace("consumer:%s, perform delete action", pConsumer->name);
mTrace("consumer:%ld, perform delete action", pConsumer->uid);
return 0;
}
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) {
mTrace("consumer:%s, perform update action", pOldConsumer->name);
mTrace("consumer:%ld, perform update action", pOldConsumer->uid);
atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime);
atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version);
......@@ -157,9 +220,34 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq *pSubscribe;
tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
// add consumerGroupId -> list<consumerId> to sdb
// add consumerId -> list<consumer> to sdb
// add consumer -> list<consumerId> to sdb
int topicNum = pSubscribe->topicNum;
int64_t consumerId = pSubscribe->consumerId;
char *consumerGroup = pSubscribe->consumerGroup;
// get consumer group and add client into it
SCGroupObj *pCGroup = sdbAcquire(pMnode->pSdb, SDB_CGROUP, consumerGroup);
if (pCGroup != NULL) {
// iterate the list until finding the consumer
// add consumer to cgroup list if not found
// put new record
}
SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer != NULL) {
//reset topic list
}
for (int i = 0; i < topicNum; i++) {
char *topicName = pSubscribe->topicName[i];
STopicObj *pTopic = mndAcquireTopic(pMnode, topicName);
//get
// consumer id
SList *list = pTopic->consumerIds;
// add the consumer if not in the list
//
SList* topicList = pConsumer->topics;
//add to topic
}
return 0;
}
......@@ -255,9 +343,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
if (strcmp(pConsumer->db, dbName) == 0) {
numOfConsumers++;
}
numOfConsumers++;
sdbRelease(pSdb, pConsumer);
}
......@@ -316,11 +402,11 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMs
return 0;
}
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveCGroup(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SConsumerObj *pConsumer = NULL;
SCGroupObj *pCGroup = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[64] = {0};
......@@ -330,36 +416,28 @@ static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pCGroup);
if (pShow->pIter == NULL) break;
if (strncmp(pConsumer->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pConsumer);
if (strncmp(pCGroup->name, prefix, prefixLen) != 0) {
sdbRelease(pSdb, pCGroup);
continue;
}
cols = 0;
char consumerName[TSDB_TABLE_NAME_LEN] = {0};
tstrncpy(consumerName, pConsumer->name + prefixLen, TSDB_TABLE_NAME_LEN);
tstrncpy(consumerName, pCGroup->name + prefixLen, TSDB_TABLE_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, consumerName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pConsumer->createTime;
*(int64_t *)pWrite = pCGroup->createTime;
cols++;
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pConsumer->numOfColumns;*/
/*cols++;*/
/*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/
/**(int32_t *)pWrite = pConsumer->numOfTags;*/
/*cols++;*/
numOfRows++;
sdbRelease(pSdb, pConsumer);
sdbRelease(pSdb, pCGroup);
}
pShow->numOfReads += numOfRows;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册