提交 b0d935ba 编写于 作者: L Liu Jicong

merge from 3.0

上级 df554c03
......@@ -1192,7 +1192,7 @@ typedef struct {
char* physicalPlan;
} SMVSubscribeReq;
static FORCE_INLINE int tSerializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
static FORCE_INLINE int tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) {
int tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->topicId);
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
......
......@@ -78,41 +78,41 @@ static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
int32_t dataPos = 0;
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId);
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
int32_t len = strlen(pConsumer->cgroup);
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CM_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, topicNum);
SDB_SET_INT32(pRaw, dataPos, topicNum, CM_ENCODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t len;
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i);
len = strlen(pConsumerTopic->name);
SDB_SET_INT32(pRaw, dataPos, len);
SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len);
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len, CM_ENCODE_OVER);
int vgSize;
if (pConsumerTopic->vgroups == NULL) {
vgSize = 0;
} else {
vgSize = listNEles(pConsumerTopic->vgroups);
}
SDB_SET_INT32(pRaw, dataPos, vgSize);
SDB_SET_INT32(pRaw, dataPos, vgSize, CM_ENCODE_OVER);
for (int j = 0; j < vgSize; j++) {
// SList* head;
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
CM_ENCODE_OVER:
if (terrno != 0) {
mError("consumer:%s, failed to encode to raw:%p since %s", pConsumer->name, pRaw, terrstr());
mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("consumer:%s, encode to raw:%p, row:%p", pConsumer->name, pRaw, pConsumer);
mTrace("consumer:%ld, encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
return pRaw;
}
......@@ -129,17 +129,17 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
int32_t size = sizeof(SMqConsumerObj);
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto CONSUMER_DECODE_OVER;
if (pRow == NULL) goto CONSUME_DECODE_OVER;
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) goto CONSUMER_DECODE_OVER;
if (pConsumer == NULL) goto CONSUME_DECODE_OVER;
int32_t dataPos = 0;
SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->consumerId, CONSUME_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pConsumer->consumerId, CONSUME_DECODE_OVER);
int32_t len, topicNum;
SDB_GET_INT32(pRaw, pRow, dataPos, &len, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->cgroup, len, CONSUME_DECODE_OVER);
SDB_GET_INT32(pRaw, pRow, dataPos, &topicNum, CONSUME_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CONSUME_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t topicLen;
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
......@@ -149,20 +149,21 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
return NULL;
}
/*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/
SDB_GET_INT32(pRaw, pRow, dataPos, &topicLen, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &topicLen, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER);
int32_t vgSize;
SDB_GET_INT32(pRaw, pRow, dataPos, &vgSize, CONSUME_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
}
CONSUME_DECODE_OVER:
if (terrno != 0) {
mError("consumer:%s, failed to decode from raw:%p since %s", pConsumer->name, pRaw, terrstr());
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
tfree(pRow);
return NULL;
}
SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE);
/*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/
return pRow;
}
......
......@@ -57,27 +57,26 @@ int32_t mndInitTopic(SMnode *pMnode) {
void mndCleanupTopic(SMnode *pMnode) {}
static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) {
int32_t size = sizeof(STopicObj) + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
if (pRaw == NULL) goto WTF;
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN);
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN);
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime);
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime);
SDB_SET_INT64(pRaw, dataPos, pTopic->uid);
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid);
SDB_SET_INT32(pRaw, dataPos, pTopic->version);
SDB_SET_INT32(pRaw, dataPos, pTopic->execLen);
SDB_SET_BINARY(pRaw, dataPos, pTopic->executor, pTopic->execLen);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE);
SDB_SET_DATALEN(pRaw, dataPos);
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, WTF);
SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->uid, WTF);
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, WTF);
SDB_SET_INT32(pRaw, dataPos, pTopic->version, WTF);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, WTF);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, WTF);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, WTF);
SDB_SET_DATALEN(pRaw, dataPos, WTF);
WTF:
return pRaw;
}
......@@ -109,9 +108,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
......@@ -374,13 +373,11 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
int32_t numOfTopics = 0;
void *pIter = NULL;
while (1) {
STopicObj *pTopic = NULL;
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
if (strcmp(pTopic->db, dbName) == 0) {
numOfTopics++;
}
numOfTopics++;
sdbRelease(pSdb, pTopic);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册