未验证 提交 c67e0b62 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1897 from taosdata/feature/notifyTSDBstatus

tsdb notify the vnode about it status changes
...@@ -34,12 +34,14 @@ extern "C" { ...@@ -34,12 +34,14 @@ extern "C" {
#define TSDB_INVALID_SUPER_TABLE_ID -1 #define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
// --------- TSDB APPLICATION HANDLE DEFINITION // --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct { typedef struct {
// WAL handle
void *appH; void *appH;
void *cqH; void *cqH;
int (*walCallBack)(void *); int (*notifyStatus)(void *, int status);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
} STsdbAppH; } STsdbAppH;
......
...@@ -284,6 +284,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { ...@@ -284,6 +284,7 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
pRepo->tsdbCache->curBlock = NULL; pRepo->tsdbCache->curBlock = NULL;
tsdbUnLockRepo(repo); tsdbUnLockRepo(repo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
tsdbCommitData((void *)repo); tsdbCommitData((void *)repo);
tsdbCloseFileH(pRepo->tsdbFileH); tsdbCloseFileH(pRepo->tsdbFileH);
...@@ -330,7 +331,7 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) { ...@@ -330,7 +331,7 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) {
int32_t tsdbTriggerCommit(TsdbRepoT *repo) { int32_t tsdbTriggerCommit(TsdbRepoT *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo->appH.walCallBack) pRepo->appH.walCallBack(pRepo->appH.appH); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
tsdbLockRepo(repo); tsdbLockRepo(repo);
if (pRepo->commit) { if (pRepo->commit) {
...@@ -942,7 +943,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { ...@@ -942,7 +943,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) {
// Commit to file // Commit to file
static void *tsdbCommitData(void *arg) { static void *tsdbCommitData(void *arg) {
printf("Starting to commit....\n");
STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCache *pCache = pRepo->tsdbCache; STsdbCache *pCache = pRepo->tsdbCache;
...@@ -951,6 +951,8 @@ static void *tsdbCommitData(void *arg) { ...@@ -951,6 +951,8 @@ static void *tsdbCommitData(void *arg) {
SRWHelper whelper = {0}; SRWHelper whelper = {0};
if (pCache->imem == NULL) return NULL; if (pCache->imem == NULL) return NULL;
tsdbPrint("vgId: %d, starting to commit....", pRepo->config.tsdbId);
// Create the iterator to read from cache // Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables);
if (iters == NULL) { if (iters == NULL) {
...@@ -974,6 +976,7 @@ static void *tsdbCommitData(void *arg) { ...@@ -974,6 +976,7 @@ static void *tsdbCommitData(void *arg) {
// Do retention actions // Do retention actions
tsdbFitRetention(pRepo); tsdbFitRetention(pRepo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
_exit: _exit:
tdFreeDataCols(pDataCols); tdFreeDataCols(pDataCols);
...@@ -1176,4 +1179,4 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t * ...@@ -1176,4 +1179,4 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *
magic = *size; magic = *size;
return magic; return magic;
} }
\ No newline at end of file
...@@ -38,6 +38,7 @@ typedef struct { ...@@ -38,6 +38,7 @@ typedef struct {
int status; int status;
int8_t role; int8_t role;
int64_t version; int64_t version;
int64_t savedVersion;
void *wqueue; void *wqueue;
void *rqueue; void *rqueue;
void *wal; void *wal;
......
...@@ -33,12 +33,11 @@ static int32_t tsOpennedVnodes; ...@@ -33,12 +33,11 @@ static int32_t tsOpennedVnodes;
static void *tsDnodeVnodesHash; static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param); static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWalCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static bool vnodeReadVersion(SVnodeObj *pVnode); static bool vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeWalCallback(void *arg); static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
...@@ -206,7 +205,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -206,7 +205,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.walCallBack = vnodeWalCallback; appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq; appH.cqH = pVnode->cq;
sprintf(temp, "%s/tsdb", rootDir); sprintf(temp, "%s/tsdb", rootDir);
...@@ -374,14 +373,22 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -374,14 +373,22 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
walClose(pVnode->wal); walClose(pVnode->wal);
pVnode->wal = NULL; pVnode->wal = NULL;
vnodeSaveVersion(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
// TODO: this is a simple implement // TODO: this is a simple implement
static int vnodeWalCallback(void *arg) { static int vnodeProcessTsdbStatus(void *arg, int status) {
SVnodeObj *pVnode = arg; SVnodeObj *pVnode = arg;
return walRenew(pVnode->wal);
if (status == TSDB_STATUS_COMMIT_START) {
pVnode->savedVersion = pVnode->version;
return walRenew(pVnode->wal);
}
if (status == TSDB_STATUS_COMMIT_OVER)
return vnodeSaveVersion(pVnode);
return 0;
} }
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) {
...@@ -414,7 +421,7 @@ static void vnodeNotifyFileSynced(void *ahandle) { ...@@ -414,7 +421,7 @@ static void vnodeNotifyFileSynced(void *ahandle) {
tsdbCloseRepo(pVnode->tsdb); tsdbCloseRepo(pVnode->tsdb);
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.walCallBack = vnodeWalCallback; appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq; appH.cqH = pVnode->cq;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
} }
...@@ -685,14 +692,14 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { ...@@ -685,14 +692,14 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
char * content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->version); len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->savedVersion);
len += snprintf(content + len, maxLen - len, "}\n"); len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp); fwrite(content, 1, len, fp);
fclose(fp); fclose(fp);
free(content); free(content);
vPrint("vgId:%d, save vnode version:%" PRId64 " successed", pVnode->vgId, pVnode->version); vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->savedVersion);
return 0; return 0;
} }
...@@ -734,7 +741,7 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) { ...@@ -734,7 +741,7 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) {
ret = true; ret = true;
vPrint("vgId:%d, read vnode version successed, version:%%" PRId64, pVnode->vgId, pVnode->version); vPrint("vgId:%d, read vnode version succeed, version:%" PRId64, pVnode->vgId, pVnode->version);
PARSE_OVER: PARSE_OVER:
free(content); free(content);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册