提交 69cb4b5c 编写于 作者: L Liu Jicong

refactor(tmq): persist ast in topic

上级 17c60b6c
......@@ -1884,7 +1884,6 @@ typedef struct {
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
char* qmsg;
} SMqSetCVgReq;
......@@ -1898,7 +1897,6 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->topicName);
tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->qmsg);
return tlen;
......@@ -1912,7 +1910,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeStringTo(buf, pReq->topicName);
buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeString(buf, &pReq->qmsg);
return buf;
......
......@@ -469,7 +469,10 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
}
if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
if (pResultInfo == NULL) return -1;
if (pResultInfo == NULL) {
(*numOfRows) = 0;
return 0;
}
pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
......
......@@ -379,20 +379,20 @@ typedef struct {
} SFuncObj;
typedef struct {
int64_t id;
int8_t type;
int8_t replica;
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
int32_t payloadLen;
void* pIter;
SMnode* pMnode;
int64_t id;
int8_t type;
int8_t replica;
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
int32_t payloadLen;
void* pIter;
SMnode* pMnode;
STableMetaRsp* pMeta;
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
} SShowObj;
typedef struct {
......@@ -625,14 +625,14 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->consumers) {
//taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
// taosArrayDestroy(pSub->consumers);
// taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
// taosArrayDestroy(pSub->consumers);
pSub->consumers = NULL;
}
if (pSub->unassignedVg) {
//taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
// taosArrayDestroy(pSub->unassignedVg);
// taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
// taosArrayDestroy(pSub->unassignedVg);
pSub->unassignedVg = NULL;
}
}
......@@ -647,8 +647,9 @@ typedef struct {
int32_t version;
SRWLatch lock;
int32_t sqlLen;
int32_t astLen;
char* sql;
char* logicalPlan;
char* ast;
char* physicalPlan;
SSchemaWrapper schema;
} SMqTopicObj;
......
......@@ -201,6 +201,7 @@ static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) {
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) {
mTrace("offset:%s, perform update action", pOldOffset->key);
pOldOffset->offset = pNewOffset->offset;
return 0;
}
......
......@@ -469,6 +469,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
taosArrayPush(pSub->unassignedVg, pConsumerEp);
mDebug("mq rebalance: vg %d push to unassignedVg", pConsumerEp->vgId);
}
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
......@@ -512,6 +513,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
mDebug("mq rebalance: vg %d pop from unassignedVg", pConsumerEp->vgId);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
......@@ -570,7 +572,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
.vgId = vgId,
.consumerId = pConsumerEp->consumerId,
.sql = pTopic->sql,
.logicalPlan = pTopic->logicalPlan,
.physicalPlan = pTopic->physicalPlan,
.qmsg = pConsumerEp->qmsg,
};
......
......@@ -26,7 +26,7 @@
#include "parser.h"
#include "tname.h"
#define MND_TOPIC_VER_NUMBER 1
#define MND_TOPIC_VER_NUMBER 1
#define MND_TOPIC_RESERVE_SIZE 64
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
......@@ -52,7 +52,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);
return sdbSetTable(pMnode->pSdb, table);
......@@ -63,11 +63,10 @@ void mndCleanupTopic(SMnode *pMnode) {}
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
int32_t swLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
int32_t schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
int32_t size =
sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + swLen + MND_TOPIC_RESERVE_SIZE;
sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
......@@ -81,19 +80,19 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
void *swBuf = taosMemoryMalloc(swLen);
void *swBuf = taosMemoryMalloc(schemaLen);
if (swBuf == NULL) {
goto TOPIC_ENCODE_OVER;
}
void *aswBuf = swBuf;
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_INT32(pRaw, dataPos, swLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, swBuf, swLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
......@@ -137,23 +136,25 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen + 1, sizeof(char));
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
if (pTopic->sql == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->logicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
if (pTopic->logicalPlan == NULL) {
SDB_GET_INT32(pRaw, dataPos, &pTopic->astLen, TOPIC_DECODE_OVER);
pTopic->ast = taosMemoryCalloc(pTopic->astLen, sizeof(char));
if (pTopic->ast == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->physicalPlan = taosMemoryCalloc(len + 1, sizeof(char));
pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
if (pTopic->physicalPlan == NULL) {
taosMemoryFree(pTopic->logicalPlan);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
......@@ -257,6 +258,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
return 0;
}
#if 0
static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS;
......@@ -279,6 +281,7 @@ static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
terrno = code;
return code;
}
#endif
static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
mDebug("topic:%s to create", pCreate->name);
......@@ -290,32 +293,39 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
topicObj.dbUid = pDb->uid;
topicObj.version = 1;
topicObj.sql = pCreate->sql;
topicObj.physicalPlan = "";
topicObj.logicalPlan = "";
topicObj.sqlLen = strlen(pCreate->sql);
char *pPlanStr = NULL;
if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &pPlanStr)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
if (NULL != pPlanStr) {
topicObj.physicalPlan = pPlanStr;
}
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) < 0) {
SQueryPlan *pPlan = NULL;
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
if (nodesNodeToString(pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFreeClear(pPlanStr);
taosMemoryFreeClear(topicObj.physicalPlan);
return -1;
}
mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
......@@ -323,7 +333,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
taosMemoryFreeClear(pPlanStr);
taosMemoryFreeClear(topicObj.physicalPlan);
mndTransDrop(pTrans);
return -1;
}
......@@ -331,12 +341,12 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
taosMemoryFreeClear(pPlanStr);
taosMemoryFreeClear(topicObj.physicalPlan);
mndTransDrop(pTrans);
return -1;
}
taosMemoryFreeClear(pPlanStr);
taosMemoryFreeClear(topicObj.physicalPlan);
mndTransDrop(pTrans);
return 0;
}
......
......@@ -88,6 +88,8 @@ void* MndTestTopic::BuildDropTopicReq(const char* topicName, int32_t* pContLen)
}
TEST_F(MndTestTopic, 01_Create_Topic) {
// TODO add valid ast for unit test
#if 0
const char* dbname = "1.d1";
const char* topicName = "1.d1.t1";
......@@ -171,4 +173,5 @@ TEST_F(MndTestTopic, 01_Create_Topic) {
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0);
}
#endif
}
......@@ -126,8 +126,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); }
int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->logicalPlan) + strlen(pTopic->physicalPlan) +
strlen(pTopic->qmsg) + sizeof(int64_t) * 3;
return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) +
sizeof(int64_t) * 3;
}
int32_t tqGetConsumerHandleSize(const STqConsumer* pConsumer) {
......@@ -144,7 +144,6 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic)
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopic->topicName);
/*tlen += taosEncodeString(buf, pTopic->sql);*/
/*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/
/*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/
tlen += taosEncodeString(buf, pTopic->qmsg);
/*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/
......@@ -156,7 +155,6 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic)
static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopic) {
buf = taosDecodeStringTo(buf, pTopic->topicName);
/*buf = taosDecodeString(buf, &pTopic->sql);*/
/*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/
/*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/
buf = taosDecodeString(buf, &pTopic->qmsg);
/*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/
......@@ -722,7 +720,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
}
strcpy(pTopic->topicName, req.topicName);
pTopic->sql = req.sql;
pTopic->logicalPlan = req.logicalPlan;
pTopic->physicalPlan = req.physicalPlan;
pTopic->qmsg = req.qmsg;
/*pTopic->committedOffset = -1;*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册