提交 fbf9cb70 编写于 作者: C Cary Xu

enh: tsma code refactor

上级 7a6f94a8
...@@ -44,6 +44,7 @@ static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); ...@@ -44,6 +44,7 @@ static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
static void mndDestroySmaObj(SSmaObj *pSmaObj);
int32_t mndInitSma(SMnode *pMnode) { int32_t mndInitSma(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
...@@ -390,7 +391,9 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb ...@@ -390,7 +391,9 @@ static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
taosRLockLatch(&pStb->lock); taosRLockLatch(&pStb->lock);
memcpy(&stbObj, pStb, sizeof(SStbObj)); memcpy(&stbObj, pStb, sizeof(SStbObj));
taosRUnLockLatch(&pStb->lock); taosRUnLockLatch(&pStb->lock);
stbObj.numOfColumns = 0;
stbObj.pColumns = NULL; stbObj.pColumns = NULL;
stbObj.numOfTags = 0;
stbObj.pTags = NULL; stbObj.pTags = NULL;
stbObj.updateTime = taosGetTimestampMs(); stbObj.updateTime = taosGetTimestampMs();
stbObj.lock = 0; stbObj.lock = 0;
...@@ -501,6 +504,13 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, ...@@ -501,6 +504,13 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
return 0; return 0;
} }
static void mndDestroySmaObj(SSmaObj *pSmaObj) {
if (pSmaObj) {
taosMemoryFreeClear(pSmaObj->schemaRow.pSchema);
taosMemoryFreeClear(pSmaObj->schemaTag.pSchema);
}
}
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) { static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) {
SSmaObj smaObj = {0}; SSmaObj smaObj = {0};
memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
...@@ -524,29 +534,17 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -524,29 +534,17 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
smaObj.tagsFilterLen = pCreate->tagsFilterLen; smaObj.tagsFilterLen = pCreate->tagsFilterLen;
smaObj.sqlLen = pCreate->sqlLen; smaObj.sqlLen = pCreate->sqlLen;
smaObj.astLen = pCreate->astLen; smaObj.astLen = pCreate->astLen;
if (smaObj.exprLen > 0) { if (smaObj.exprLen > 0) {
smaObj.expr = taosMemoryMalloc(smaObj.exprLen); smaObj.expr = pCreate->expr;
if (smaObj.expr == NULL) goto _OVER;
memcpy(smaObj.expr, pCreate->expr, smaObj.exprLen);
} }
if (smaObj.tagsFilterLen > 0) { if (smaObj.tagsFilterLen > 0) {
smaObj.tagsFilter = taosMemoryMalloc(smaObj.tagsFilterLen); smaObj.tagsFilter = pCreate->tagsFilter;
if (smaObj.tagsFilter == NULL) goto _OVER;
memcpy(smaObj.tagsFilter, pCreate->tagsFilter, smaObj.tagsFilterLen);
} }
if (smaObj.sqlLen > 0) { if (smaObj.sqlLen > 0) {
smaObj.sql = taosMemoryMalloc(smaObj.sqlLen); smaObj.sql = pCreate->sql;
if (smaObj.sql == NULL) goto _OVER;
memcpy(smaObj.sql, pCreate->sql, smaObj.sqlLen);
} }
if (smaObj.astLen > 0) { if (smaObj.astLen > 0) {
smaObj.ast = taosMemoryMalloc(smaObj.astLen); smaObj.ast = pCreate->ast;
if (smaObj.ast == NULL) goto _OVER;
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
} }
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
...@@ -589,6 +587,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -589,6 +587,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
code = 0; code = 0;
_OVER: _OVER:
mndDestroySmaObj(&smaObj);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
...@@ -1013,7 +1012,6 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool ...@@ -1013,7 +1012,6 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
rsp->version = pStb->smaVer; rsp->version = pStb->smaVer;
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break; if (pIter == NULL) break;
......
...@@ -132,7 +132,7 @@ ...@@ -132,7 +132,7 @@
#./test.sh -f tsim/mnode/basic1.sim -m #./test.sh -f tsim/mnode/basic1.sim -m
# --- sma # --- sma
#./test.sh -f tsim/sma/tsmaCreateInsertData.sim ./test.sh -f tsim/sma/tsmaCreateInsertData.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
# --- valgrind # --- valgrind
......
...@@ -37,6 +37,14 @@ print =============== trigger stream to execute sma aggr task and insert sma dat ...@@ -37,6 +37,14 @@ print =============== trigger stream to execute sma aggr task and insert sma dat
sql insert into ct1 values(now+5s, 20, 20.0, 30.0) sql insert into ct1 values(now+5s, 20, 20.0, 30.0)
#=================================================================== #===================================================================
print =============== show streams ================================
sql show streams;
print $data00 $data01 $data02
if $data00 != d1 then
return -1
endi
print =============== select * from ct1 from memory print =============== select * from ct1 from memory
sql select * from ct1; sql select * from ct1;
print $data00 $data01 print $data00 $data01
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册