提交 2a203472 编写于 作者: H Hongze Cheng

add more code

上级 df6eb27b
...@@ -75,6 +75,7 @@ typedef struct SStreamStateWriter SStreamStateWriter; ...@@ -75,6 +75,7 @@ typedef struct SStreamStateWriter SStreamStateWriter;
typedef struct SRSmaSnapReader SRSmaSnapReader; typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SRSmaSnapWriter SRSmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SSnapDataHdr SSnapDataHdr;
typedef struct SCommitInfo SCommitInfo;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb" #define VNODE_TSDB_DIR "tsdb"
...@@ -147,7 +148,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* ...@@ -147,7 +148,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg*
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbPrepareCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
...@@ -205,7 +206,7 @@ int32_t smaSyncPreCommit(SSma* pSma); ...@@ -205,7 +206,7 @@ int32_t smaSyncPreCommit(SSma* pSma);
int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma);
int32_t smaSyncPostCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma);
int32_t smaPreCommit(SSma* pSma); int32_t smaPreCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
int32_t smaFinishCommit(SSma* pSma); int32_t smaFinishCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaDoRetention(SSma* pSma, int64_t now);
...@@ -407,6 +408,11 @@ struct SSnapDataHdr { ...@@ -407,6 +408,11 @@ struct SSnapDataHdr {
uint8_t data[]; uint8_t data[];
}; };
struct SCommitInfo {
SVnodeInfo info;
SVnode* pVnode;
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,7 +23,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma); ...@@ -23,7 +23,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma);
#endif #endif
static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma);
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo);
static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma);
static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat);
...@@ -67,7 +67,7 @@ int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); ...@@ -67,7 +67,7 @@ int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma);
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
int32_t smaCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } int32_t smaCommit(SSma *pSma, SCommitInfo *pInfo) { return tdProcessRSmaAsyncCommitImpl(pSma, pInfo); }
/** /**
* @brief async commit, only applicable to Rollup SMA * @brief async commit, only applicable to Rollup SMA
...@@ -387,7 +387,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -387,7 +387,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
* @param pSma * @param pSma
* @return int32_t * @return int32_t
*/ */
static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
#if 0 #if 0
...@@ -399,11 +399,11 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { ...@@ -399,11 +399,11 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
} }
#endif #endif
if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) { if ((code = tsdbCommit(VND_RSMA1(pVnode), pInfo)) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit; goto _exit;
} }
if ((code = tsdbCommit(VND_RSMA2(pVnode))) < 0) { if ((code = tsdbCommit(VND_RSMA2(pVnode), pInfo)) < 0) {
smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code));
goto _exit; goto _exit;
} }
......
...@@ -93,7 +93,7 @@ typedef struct { ...@@ -93,7 +93,7 @@ typedef struct {
SArray *aDelData; // SArray<SDelData> SArray *aDelData; // SArray<SDelData>
} SCommitter; } SCommitter;
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo);
static int32_t tsdbCommitData(SCommitter *pCommitter); static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter); static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter); static int32_t tsdbCommitCache(SCommitter *pCommitter);
...@@ -160,7 +160,7 @@ int32_t tsdbPrepareCommit(STsdb *pTsdb) { ...@@ -160,7 +160,7 @@ int32_t tsdbPrepareCommit(STsdb *pTsdb) {
return 0; return 0;
} }
int32_t tsdbCommit(STsdb *pTsdb) { int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
if (!pTsdb) return 0; if (!pTsdb) return 0;
int32_t code = 0; int32_t code = 0;
...@@ -179,7 +179,7 @@ int32_t tsdbCommit(STsdb *pTsdb) { ...@@ -179,7 +179,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
} }
// start commit // start commit
code = tsdbStartCommit(pTsdb, &commith); code = tsdbStartCommit(pTsdb, &commith, pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// commit impl // commit impl
...@@ -816,7 +816,7 @@ _exit: ...@@ -816,7 +816,7 @@ _exit:
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -824,13 +824,13 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { ...@@ -824,13 +824,13 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
ASSERT(pTsdb->imem && "last tsdb commit incomplete"); ASSERT(pTsdb->imem && "last tsdb commit incomplete");
pCommitter->pTsdb = pTsdb; pCommitter->pTsdb = pTsdb;
pCommitter->commitID = pTsdb->pVnode->state.commitID; pCommitter->commitID = pInfo->info.state.commitID;
pCommitter->minutes = pTsdb->keepCfg.days; pCommitter->minutes = pTsdb->keepCfg.days;
pCommitter->precision = pTsdb->keepCfg.precision; pCommitter->precision = pTsdb->keepCfg.precision;
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
pCommitter->sttTrigger = pTsdb->pVnode->config.sttTrigger; pCommitter->sttTrigger = pInfo->info.config.sttTrigger;
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) { if (pCommitter->aTbDataP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -14,11 +14,7 @@ ...@@ -14,11 +14,7 @@
*/ */
#include "vnd.h" #include "vnd.h"
#include "vnodeInt.h"
typedef struct {
SVnodeInfo info;
SVnode *pVnode;
} SCommitInfo;
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"
...@@ -291,11 +287,11 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -291,11 +287,11 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbCommit(pVnode->pTsdb); code = tsdbCommit(pVnode->pTsdb, pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
code = smaCommit(pVnode->pSma); code = smaCommit(pVnode->pSma, pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册