未验证 提交 93f0c4c8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17462 from taosdata/fix/TD-19254-D

chore: code optimization for sma commit
...@@ -206,9 +206,9 @@ int32_t smaBegin(SSma* pSma); ...@@ -206,9 +206,9 @@ int32_t smaBegin(SSma* pSma);
int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma);
int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma);
int32_t smaSyncPostCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaPreCommit(SSma* pSma);
int32_t smaAsyncCommit(SSma* pSma); int32_t smaCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
......
...@@ -54,28 +54,28 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p ...@@ -54,28 +54,28 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p
#endif #endif
/** /**
* @brief Only applicable to Rollup SMA * @brief async commit, only applicable to Rollup SMA
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); }
/** /**
* @brief Only applicable to Rollup SMA * @brief async commit, only applicable to Rollup SMA
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } int32_t smaCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); }
/** /**
* @brief Only applicable to Rollup SMA * @brief async commit, only applicable to Rollup SMA
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaAsyncPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); } int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); }
/** /**
* @brief set rsma trigger stat active * @brief set rsma trigger stat active
...@@ -366,9 +366,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -366,9 +366,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
int32_t code = 0;
SVnode *pVnode = pSma->pVnode;
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma);
if (!pSmaEnv) { if (!pSmaEnv) {
return TSDB_CODE_SUCCESS; goto _exit;
} }
#if 0 #if 0
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
...@@ -378,8 +380,21 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { ...@@ -378,8 +380,21 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
#endif #endif
if ((code = tsdbCommit(VND_RSMA0(pVnode))) < 0) {
return TSDB_CODE_SUCCESS; smaError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
if ((code = tsdbCommit(VND_RSMA2(pVnode))) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit;
}
_exit:
terrno = code;
return code;
} }
/** /**
......
...@@ -239,10 +239,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -239,10 +239,8 @@ int vnodeCommit(SVnode *pVnode) {
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit if (smaPreCommit(pVnode->pSma) < 0) {
// smaSyncPreCommit(pVnode->pSma); vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
if(smaAsyncPreCommit(pVnode->pSma) < 0){
vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
...@@ -256,21 +254,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -256,21 +254,8 @@ int vnodeCommit(SVnode *pVnode) {
} }
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
if (smaAsyncCommit(pVnode->pSma) < 0) { if (smaCommit(pVnode->pSma) < 0) {
vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
} else { } else {
...@@ -294,10 +279,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -294,10 +279,8 @@ int vnodeCommit(SVnode *pVnode) {
pVnode->state.committed = info.state.committed; pVnode->state.committed = info.state.committed;
// postCommit if (smaPostCommit(pVnode->pSma) < 0) {
// smaSyncPostCommit(pVnode->pSma); vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
if (smaAsyncPostCommit(pVnode->pSma) < 0) {
vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -5,7 +5,7 @@ sleep 50 ...@@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
print =============== create database print =============== create database
sql create database d1 vgroups 1 sql create database d1 keep 36500d vgroups 1
sql use d1 sql use d1
print =============== create super table, include column type for count/sum/min/max/first print =============== create super table, include column type for count/sum/min/max/first
...@@ -25,8 +25,8 @@ if $rows != 1 then ...@@ -25,8 +25,8 @@ if $rows != 1 then
endi endi
print =============== insert data, mode1: one row one table in sql print =============== insert data, mode1: one row one table in sql
sql insert into ct1 values(now+0s, 10, 2.0, 3.0) sql insert into ct1 values('2022-10-19 09:55:45.682', 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) sql insert into ct1 values('2022-10-19 09:55:46.682', 11, 2.1, 3.1)('2022-10-19 09:55:47.682', -12, -2.2, -3.2)('2022-10-19 09:55:48.682', -13, -2.3, -3.3)
print =============== create sma index from super table print =============== create sma index from super table
...@@ -34,7 +34,7 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in ...@@ -34,7 +34,7 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in
print $data00 $data01 $data02 $data03 print $data00 $data01 $data02 $data03
print =============== trigger stream to execute sma aggr task and insert sma data into sma store print =============== trigger stream to execute sma aggr task and insert sma data into sma store
sql insert into ct1 values(now+5s, 20, 20.0, 30.0) sql insert into ct1 values('2022-10-19 09:55:50.682', 20, 20.0, 30.0)
#=================================================================== #===================================================================
print =============== show streams ================================ print =============== show streams ================================
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册