提交 05a0774b 编写于 作者: L Liu Jicong

feat(tmq): add sub type

上级 d6094a43
......@@ -1473,15 +1473,25 @@ typedef struct {
int64_t streamId;
} SMVCreateStreamRsp, SMSCreateStreamRsp;
enum {
TOPIC_SUB_TYPE__DB = 1,
TOPIC_SUB_TYPE__TABLE,
TOPIC_SUB_TYPE__COLUMN,
};
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists;
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
int8_t subType;
char* sql;
char* ast;
char subscribeDbName[TSDB_DB_NAME_LEN];
union {
char* ast;
char subDbName[TSDB_DB_NAME_LEN];
char subStbName[TSDB_TABLE_FNAME_LEN];
};
} SCMCreateTopicReq;
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
......@@ -2145,11 +2155,6 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return buf;
}
enum {
TOPIC_SUB_TYPE__DB = 1,
TOPIC_SUB_TYPE__TABLE,
};
typedef struct {
SMsgHead head;
int64_t leftForVer;
......
......@@ -2677,10 +2677,12 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1;
/*if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;*/
/*if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1;*/
/*if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1;*/
if (tEncodeI8(&encoder, pReq->subType) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
/*if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;*/
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
......@@ -2703,10 +2705,14 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1;
/*if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;*/
/*if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1;*/
/*if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1;*/
if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
/*if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {*/
/*if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;*/
/*}*/
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
......
......@@ -307,7 +307,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
}
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) {
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subDbName[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_TOPIC;
return -1;
}
......@@ -333,8 +333,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
topicObj.subType = TOPIC_SUB_TYPE__TABLE;
topicObj.withTbName = pCreate->withTbName;
topicObj.withSchema = pCreate->withSchema;
/*topicObj.withTbName = pCreate->withTbName;*/
/*topicObj.withSchema = pCreate->withSchema;*/
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
......@@ -441,7 +441,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
goto CREATE_TOPIC_OVER;
}
pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName);
pDb = mndAcquireDb(pMnode, createTopicReq.subDbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
goto CREATE_TOPIC_OVER;
......
......@@ -122,10 +122,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
ASSERT(0);
}
/*if (tdbBegin(pTq->pMetaStore, &txn) < 0) {*/
/*ASSERT(0);*/
/*}*/
TBC* pCur;
if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) {
ASSERT(0);
......
......@@ -3239,9 +3239,9 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
tNameGetFullDbName(&name, pReq->name);
pReq->igExists = pStmt->ignoreExists;
pReq->withTbName = pStmt->pOptions->withTable;
pReq->withSchema = pStmt->pOptions->withSchema;
pReq->withTag = pStmt->pOptions->withTag;
/*pReq->withTbName = pStmt->pOptions->withTable;*/
/*pReq->withSchema = pStmt->pOptions->withSchema;*/
/*pReq->withTag = pStmt->pOptions->withTag;*/
pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) {
......@@ -3262,7 +3262,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
dbName = pStmt->subscribeDbName;
}
tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName));
tNameGetFullDbName(&name, pReq->subscribeDbName);
tNameGetFullDbName(&name, pReq->subDbName);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册