提交 4328ac12 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

tsdb notify the vnode about it status changes

上级 b3a1070b
......@@ -34,12 +34,14 @@ extern "C" {
#define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
// WAL handle
void *appH;
void *cqH;
int (*walCallBack)(void *);
int (*notifyStatus)(void *, int status);
int (*eventCallBack)(void *);
} STsdbAppH;
......
......@@ -330,7 +330,7 @@ int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg) {
int32_t tsdbTriggerCommit(TsdbRepoT *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo->appH.walCallBack) pRepo->appH.walCallBack(pRepo->appH.appH);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
tsdbLockRepo(repo);
if (pRepo->commit) {
......@@ -942,7 +942,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) {
// Commit to file
static void *tsdbCommitData(void *arg) {
printf("Starting to commit....\n");
STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCache *pCache = pRepo->tsdbCache;
......@@ -951,6 +950,8 @@ static void *tsdbCommitData(void *arg) {
SRWHelper whelper = {0};
if (pCache->imem == NULL) return NULL;
tsdbPrint("vgId: %d, starting to commit....", pRepo->config.tsdbId);
// Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables);
if (iters == NULL) {
......@@ -974,6 +975,7 @@ static void *tsdbCommitData(void *arg) {
// Do retention actions
tsdbFitRetention(pRepo);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
_exit:
tdFreeDataCols(pDataCols);
......@@ -1176,4 +1178,4 @@ uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *
magic = *size;
return magic;
}
\ No newline at end of file
}
......@@ -33,12 +33,11 @@ static int32_t tsOpennedVnodes;
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWalCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static bool vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeWalCallback(void *arg);
static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
......@@ -206,7 +205,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
STsdbAppH appH = {0};
appH.appH = (void *)pVnode;
appH.walCallBack = vnodeWalCallback;
appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq;
sprintf(temp, "%s/tsdb", rootDir);
......@@ -374,14 +373,20 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
walClose(pVnode->wal);
pVnode->wal = NULL;
vnodeSaveVersion(pVnode);
vnodeRelease(pVnode);
}
// TODO: this is a simple implement
static int vnodeWalCallback(void *arg) {
static int vnodeProcessTsdbStatus(void *arg, int status) {
SVnodeObj *pVnode = arg;
return walRenew(pVnode->wal);
if (status == TSDB_STATUS_COMMIT_START)
return walRenew(pVnode->wal);
if (status == TSDB_STATUS_COMMIT_OVER)
return vnodeSaveVersion(pVnode);
return 0;
}
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) {
......@@ -414,7 +419,7 @@ static void vnodeNotifyFileSynced(void *ahandle) {
tsdbCloseRepo(pVnode->tsdb);
STsdbAppH appH = {0};
appH.appH = (void *)pVnode;
appH.walCallBack = vnodeWalCallback;
appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册