diff --git a/include/dnode/vnode/tsdb2/tsdb.h b/include/dnode/vnode/tsdb2/tsdb.h
index 986faffedf134748008dd6a468c53ee7be9928bd..d3228bcade829e9f6137595732238b52316af6d7 100644
--- a/include/dnode/vnode/tsdb2/tsdb.h
+++ b/include/dnode/vnode/tsdb2/tsdb.h
@@ -102,8 +102,8 @@ typedef struct STsdb STsdb;
STsdbCfg *tsdbGetCfg(const STsdb *repo);
// --------- TSDB REPOSITORY DEFINITION
-int32_t tsdbCreateRepo(int repoid);
-int32_t tsdbDropRepo(int repoid);
+// int32_t tsdbCreateRepo(int repoid);
+// int32_t tsdbDropRepo(int repoid);
STsdb * tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbClose(STsdb *repo, int toCommit);
int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg);
@@ -221,6 +221,7 @@ typedef struct SMemRef {
SMemSnapshot snapshot;
} SMemRef;
+#if 0
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
@@ -418,20 +419,21 @@ void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
-// For TSDB Compact
-int tsdbCompact(STsdb *pRepo);
+// // For TSDB Compact
+// int tsdbCompact(STsdb *pRepo);
// For TSDB Health Monitor
-// no problem return true
-bool tsdbNoProblem(STsdb *pRepo);
-// unit of walSize: MB
-int tsdbCheckWal(STsdb *pRepo, uint32_t walSize);
+// // no problem return true
+// bool tsdbNoProblem(STsdb *pRepo);
+// // unit of walSize: MB
+// int tsdbCheckWal(STsdb *pRepo, uint32_t walSize);
-// for json tag
-void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes);
-void getJsonTagValueAll(void *data, void *dst, int16_t bytes);
-char *parseTagDatatoJson(void *p);
+// // for json tag
+// void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes);
+// void getJsonTagValueAll(void *data, void *dst, int16_t bytes);
+// char *parseTagDatatoJson(void *p);
+#endif
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/tsdb2/inc/tsdbCompact.h b/source/dnode/vnode/tsdb2/inc/tsdbCompact.h
index e58332b4a16dd6be9ea3a8b3e74fa294923be67d..704dce0caad9a26cc555450b085a9b3c0e3c25c0 100644
--- a/source/dnode/vnode/tsdb2/inc/tsdbCompact.h
+++ b/source/dnode/vnode/tsdb2/inc/tsdbCompact.h
@@ -12,17 +12,17 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#ifndef _TD_TSDB_COMPACT_H_
-#define _TD_TSDB_COMPACT_H_
+// #ifndef _TD_TSDB_COMPACT_H_
+// #define _TD_TSDB_COMPACT_H_
-#ifdef __cplusplus
-extern "C" {
-#endif
+// #ifdef __cplusplus
+// extern "C" {
+// #endif
-void *tsdbCompactImpl(STsdb *pRepo);
+// void *tsdbCompactImpl(STsdb *pRepo);
-#ifdef __cplusplus
-}
-#endif
+// #ifdef __cplusplus
+// }
+// #endif
-#endif /* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
+// #endif /* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
diff --git a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h b/source/dnode/vnode/tsdb2/inc/tsdbHealth.h
deleted file mode 100644
index 8ce818f964c38ea70d756b376cb23f6b22650761..0000000000000000000000000000000000000000
--- a/source/dnode/vnode/tsdb2/inc/tsdbHealth.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-// #ifndef _TD_TSDB_HEALTH_H_
-// #define _TD_TSDB_HEALTH_H_
-
-// #include "os.h"
-// #include "tsdb.h"
-
-// bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
-// int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
-
-// bool tsdbIdleMemEnough();
-// bool tsdbAllowNewBlock(STsdbRepo* pRepo);
-
-// #endif /* _TD_TSDB_BUFFER_H_ */
diff --git a/source/dnode/vnode/tsdb2/src/tsdbCompact.c b/source/dnode/vnode/tsdb2/src/tsdbCompact.c
index baba45f9732557fb04b22893d7f18b616c55632a..57f2742c33112f0bb6df08da3682f565277c045e 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbCompact.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbCompact.c
@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+
+#if 0
#include "tsdbint.h"
typedef struct {
@@ -538,3 +540,4 @@ static int tsdbCompactMeta(STsdb *pRepo) {
return 0;
}
+#endif
\ No newline at end of file
diff --git a/source/dnode/vnode/tsdb2/src/tsdbFile.c b/source/dnode/vnode/tsdb2/src/tsdbFile.c
index daff85f0152d830f3800d8f08dcbcd91f2eee6c4..f2a0652f03f5d81dfa2af1be8be269cb29eba14f 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbFile.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbFile.c
@@ -217,7 +217,7 @@ int tsdbScanAndTryFixMFile(STsdb *pRepo) {
return -1;
}
- if (taosFtruncate(mf.fd, mf.info.size) < 0) {
+ if (taosFtruncateFile(mf.fd, mf.info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(&mf);
return -1;
@@ -276,7 +276,7 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
return -1;
}
- if (taosFtruncate(TSDB_FILE_FD(&mf), pMFile->info.size) < 0) {
+ if (taosFtruncateFile(TSDB_FILE_FD(&mf), pMFile->info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(&mf);
return -1;
@@ -459,7 +459,7 @@ static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) {
return -1;
}
- if (taosFtruncate(df.fd, df.info.size) < 0) {
+ if (taosFtruncateFile(df.fd, df.info.size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseDFile(&df);
return -1;
diff --git a/source/dnode/vnode/tsdb2/src/tsdbHealth.c b/source/dnode/vnode/tsdb2/src/tsdbHealth.c
deleted file mode 100644
index 1d70d2123def09ae171925a88e72ea8330be8cff..0000000000000000000000000000000000000000
--- a/source/dnode/vnode/tsdb2/src/tsdbHealth.c
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#if 0
-
-#include "tsdbHealth.h"
-#include "os.h"
-#include "query.h"
-#include "tarray.h"
-#include "tglobal.h"
-#include "tlist.h"
-#include "tmsg.h"
-#include "tsdbBuffer.h"
-#include "tsdbLog.h"
-#include "tsdbint.h"
-#include "tthread.h"
-#include "ttimer.h"
-
-// return malloc new block count
-int32_t tsdbInsertNewBlock(STsdbRepo* pRepo) {
- STsdbBufPool* pPool = pRepo->pPool;
- int32_t cnt = 0;
-
- if (tsdbAllowNewBlock(pRepo)) {
- STsdbBufBlock* pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
- if (pBufBlock) {
- if (tdListAppend(pPool->bufBlockList, (void*)(&pBufBlock)) < 0) {
- // append error
- tsdbFreeBufBlock(pBufBlock);
- } else {
- pPool->nElasticBlocks++;
- cnt++;
- }
- }
- }
- return cnt;
-}
-
-// switch anther thread to run
-void* cbKillQueryFree(void* param) {
- STsdbRepo* pRepo = (STsdbRepo*)param;
- // vnode
- if (pRepo->appH.notifyStatus) {
- pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
- }
-
- // free
- if (pRepo->pthread) {
- void* p = pRepo->pthread;
- pRepo->pthread = NULL;
- free(p);
- }
-
- return NULL;
-}
-
-// return true do free , false do nothing
-bool tsdbUrgeQueryFree(STsdbRepo* pRepo) {
- // check previous running
- if (pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
- tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo),
- pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
- return false;
- }
- // create new
- pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
- if (pRepo->pthread == NULL) {
- tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
- return false;
- }
- return true;
-}
-
-bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
- int32_t nMaxElastic = pRepo->config.totalBlocks / 3;
- STsdbBufPool* pPool = pRepo->pPool;
- if (pPool->nElasticBlocks >= nMaxElastic) {
- tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo),
- pPool->nElasticBlocks, nMaxElastic);
- return false;
- }
- return true;
-}
-
-bool tsdbNoProblem(STsdbRepo* pRepo) {
- if (listNEles(pRepo->pPool->bufBlockList) == 0) return false;
- return true;
-}
-
-#endif
\ No newline at end of file
diff --git a/source/dnode/vnode/tsdb2/src/tsdbMain.c b/source/dnode/vnode/tsdb2/src/tsdbMain.c
index 78a784824e3b18517aaf94ee04d9f5cf877b0564..661d6b2e3ecf9b1551c60db6b9fb355f3beb617e 100644
--- a/source/dnode/vnode/tsdb2/src/tsdbMain.c
+++ b/source/dnode/vnode/tsdb2/src/tsdbMain.c
@@ -32,36 +32,36 @@ static void tsdbStopStream(STsdb *pRepo);
static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh);
static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
-// Function declaration
-int32_t tsdbCreateRepo(int repoid) {
- char tsdbDir[TSDB_FILENAME_LEN] = "\0";
- char dataDir[TSDB_FILENAME_LEN] = "\0";
-
- tsdbGetRootDir(repoid, tsdbDir);
- if (tfsMkdir(tsdbDir) < 0) {
- goto _err;
- }
+// // Function declaration
+// int32_t tsdbCreateRepo(int repoid) {
+// char tsdbDir[TSDB_FILENAME_LEN] = "\0";
+// char dataDir[TSDB_FILENAME_LEN] = "\0";
+
+// tsdbGetRootDir(repoid, tsdbDir);
+// if (tfsMkdir(tsdbDir) < 0) {
+// goto _err;
+// }
- tsdbGetDataDir(repoid, dataDir);
- if (tfsMkdir(dataDir) < 0) {
- goto _err;
- }
+// tsdbGetDataDir(repoid, dataDir);
+// if (tfsMkdir(dataDir) < 0) {
+// goto _err;
+// }
- // TODO: need to create current file with nothing in
+// // TODO: need to create current file with nothing in
- return 0;
+// return 0;
-_err:
- tsdbError("vgId:%d failed to create TSDB repository since %s", repoid, tstrerror(terrno));
- return -1;
-}
+// _err:
+// tsdbError("vgId:%d failed to create TSDB repository since %s", repoid, tstrerror(terrno));
+// return -1;
+// }
-int32_t tsdbDropRepo(int repoid) {
- char tsdbDir[TSDB_FILENAME_LEN] = "\0";
+// int32_t tsdbDropRepo(int repoid) {
+// char tsdbDir[TSDB_FILENAME_LEN] = "\0";
- tsdbGetRootDir(repoid, tsdbDir);
- return tfsRmdir(tsdbDir);
-}
+// tsdbGetRootDir(repoid, tsdbDir);
+// return tfsRmdir(tsdbDir);
+// }
STsdb *tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdb *pRepo;
diff --git a/source/dnode/vnode/tsdb2/src/tsdbScan.c b/source/dnode/vnode/tsdb2/src/tsdbScan.c
deleted file mode 100644
index 382f7b11ae021152c9c8d314b24428b51c2e107b..0000000000000000000000000000000000000000
--- a/source/dnode/vnode/tsdb2/src/tsdbScan.c
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#include "tsdbint.h"
-
-#if 0
-#ifndef _TSDB_PLUGINS
-
-int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
-
-STsdbScanHandle* tsdbNewScanHandle() { return NULL; }
-
-void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream) {}
-
-int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
-
-int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle) { return 0; }
-
-int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx) { return 0; }
-
-int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; }
-
-void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle) {}
-
-#endif
-#endif
\ No newline at end of file
diff --git a/source/dnode/vnode/tsdb2/src/tsdbSync.c b/source/dnode/vnode/tsdb2/src/tsdbSync.c
deleted file mode 100644
index 9f63c8515d351e698f7cd33e41b0bef4f8aa832f..0000000000000000000000000000000000000000
--- a/source/dnode/vnode/tsdb2/src/tsdbSync.c
+++ /dev/null
@@ -1,727 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "taoserror.h"
-#include "tsdbint.h"
-
-// Sync handle
-typedef struct {
- STsdb *pRepo;
- SRtn rtn;
- SOCKET socketFd;
- void * pBuf;
- bool mfChanged;
- SMFile * pmf;
- SMFile mf;
- SDFileSet df;
- SDFileSet *pdf;
-} SSyncH;
-
-#define SYNC_BUFFER(sh) ((sh)->pBuf)
-
-static void tsdbInitSyncH(SSyncH *pSyncH, STsdb *pRepo, SOCKET socketFd);
-static void tsdbDestroySyncH(SSyncH *pSyncH);
-static int32_t tsdbSyncSendMeta(SSyncH *pSynch);
-static int32_t tsdbSyncRecvMeta(SSyncH *pSynch);
-static int32_t tsdbSendMetaInfo(SSyncH *pSynch);
-static int32_t tsdbRecvMetaInfo(SSyncH *pSynch);
-static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend);
-static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend);
-static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch);
-static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch);
-static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2);
-static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet);
-static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet);
-static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch);
-static int tsdbReload(STsdb *pRepo, bool isMfChanged);
-
-int32_t tsdbSyncSend(void *tsdb, SOCKET socketFd) {
- STsdb *pRepo = (STsdb *)tsdb;
- SSyncH synch = {0};
-
- tsdbInitSyncH(&synch, pRepo, socketFd);
- // Disable TSDB commit
- tsem_wait(&(pRepo->readyToCommit));
-
- if (tsdbSyncSendMeta(&synch) < 0) {
- tsdbError("vgId:%d, failed to send metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
- goto _err;
- }
-
- if (tsdbSyncSendDFileSetArray(&synch) < 0) {
- tsdbError("vgId:%d, failed to send filesets since %s", REPO_ID(pRepo), tstrerror(terrno));
- goto _err;
- }
-
- // Enable TSDB commit
- tsem_post(&(pRepo->readyToCommit));
- tsdbDestroySyncH(&synch);
- return 0;
-
-_err:
- tsem_post(&(pRepo->readyToCommit));
- tsdbDestroySyncH(&synch);
- return -1;
-}
-
-int32_t tsdbSyncRecv(void *tsdb, SOCKET socketFd) {
- STsdb *pRepo = (STsdb *)tsdb;
- SSyncH synch = {0};
-
- pRepo->state = TSDB_STATE_OK;
-
- tsdbInitSyncH(&synch, pRepo, socketFd);
- tsem_wait(&(pRepo->readyToCommit));
- tsdbStartFSTxn(pRepo, 0, 0);
-
- if (tsdbSyncRecvMeta(&synch) < 0) {
- tsdbError("vgId:%d, failed to recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
- goto _err;
- }
-
- if (tsdbSyncRecvDFileSetArray(&synch) < 0) {
- tsdbError("vgId:%d, failed to recv filesets since %s", REPO_ID(pRepo), tstrerror(terrno));
- goto _err;
- }
-
- tsdbEndFSTxn(pRepo);
- tsem_post(&(pRepo->readyToCommit));
- tsdbDestroySyncH(&synch);
-
- // Reload file change
- tsdbReload(pRepo, synch.mfChanged);
-
- return 0;
-
-_err:
- tsdbEndFSTxnWithError(REPO_FS(pRepo));
- tsem_post(&(pRepo->readyToCommit));
- tsdbDestroySyncH(&synch);
- return -1;
-}
-
-static void tsdbInitSyncH(SSyncH *pSyncH, STsdb *pRepo, SOCKET socketFd) {
- pSyncH->pRepo = pRepo;
- pSyncH->socketFd = socketFd;
- tsdbGetRtnSnap(pRepo, &(pSyncH->rtn));
-}
-
-static void tsdbDestroySyncH(SSyncH *pSyncH) { taosTZfree(pSyncH->pBuf); }
-
-static int32_t tsdbSyncSendMeta(SSyncH *pSynch) {
- STsdb *pRepo = pSynch->pRepo;
- bool toSendMeta = false;
- SMFile mf;
-
- // Send meta info to remote
- tsdbInfo("vgId:%d, metainfo will be sent", REPO_ID(pRepo));
- if (tsdbSendMetaInfo(pSynch) < 0) {
- tsdbError("vgId:%d, failed to send metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- if (pRepo->fs->cstatus->pmf == NULL) {
- // No meta file, not need to wait to retrieve meta file
- tsdbInfo("vgId:%d, metafile not exist, no need to send", REPO_ID(pRepo));
- return 0;
- }
-
- if (tsdbRecvDecision(pSynch, &toSendMeta) < 0) {
- tsdbError("vgId:%d, failed to recv decision while send meta since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- if (toSendMeta) {
- tsdbInitMFileEx(&mf, pRepo->fs->cstatus->pmf);
- if (tsdbOpenMFile(&mf, O_RDONLY) < 0) {
- tsdbError("vgId:%d, failed to open file while send metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- int64_t writeLen = mf.info.size;
- tsdbInfo("vgId:%d, metafile:%s will be sent, size:%" PRId64, REPO_ID(pRepo), mf.f.aname, writeLen);
-
- int64_t ret = taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&mf), 0, writeLen);
- if (ret != writeLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to send metafile since %s, ret:%" PRId64 " writeLen:%" PRId64, REPO_ID(pRepo),
- tstrerror(terrno), ret, writeLen);
- tsdbCloseMFile(&mf);
- return -1;
- }
-
- tsdbCloseMFile(&mf);
- tsdbInfo("vgId:%d, metafile is sent", REPO_ID(pRepo));
- } else {
- tsdbInfo("vgId:%d, metafile is same, no need to send", REPO_ID(pRepo));
- }
-
- return 0;
-}
-
-static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) {
- STsdb *pRepo = pSynch->pRepo;
- SMFile * pLMFile = pRepo->fs->cstatus->pmf;
-
- // Recv meta info from remote
- if (tsdbRecvMetaInfo(pSynch) < 0) {
- tsdbError("vgId:%d, failed to recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- // No meta file, do nothing (rm local meta file)
- if (pSynch->pmf == NULL) {
- if (pLMFile == NULL) {
- pSynch->mfChanged = false;
- } else {
- pSynch->mfChanged = true;
- }
- tsdbInfo("vgId:%d, metafile not exist in remote, no need to recv", REPO_ID(pRepo));
- return 0;
- }
-
- if (pLMFile == NULL || pSynch->pmf->info.size != pLMFile->info.size ||
- pSynch->pmf->info.magic != pLMFile->info.magic || TSDB_FILE_IS_BAD(pLMFile)) {
- // Local has no meta file or has a different meta file, need to copy from remote
- pSynch->mfChanged = true;
-
- if (tsdbSendDecision(pSynch, true) < 0) {
- tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- tsdbInfo("vgId:%d, metafile will be received", REPO_ID(pRepo));
-
- // Recv from remote
- SMFile mf;
- SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID};
- tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
- if (tsdbCreateMFile(&mf, false) < 0) {
- tsdbError("vgId:%d, failed to create file while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- tsdbInfo("vgId:%d, metafile:%s is created", REPO_ID(pRepo), mf.f.aname);
-
- int64_t readLen = pSynch->pmf->info.size;
- int64_t ret = taosCopyFds(pSynch->socketFd, TSDB_FILE_FD(&mf), readLen);
- if (ret != readLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to recv metafile since %s, ret:%" PRId64 " readLen:%" PRId64, REPO_ID(pRepo),
- tstrerror(terrno), ret, readLen);
- tsdbCloseMFile(&mf);
- tsdbRemoveMFile(&mf);
- return -1;
- }
-
- tsdbInfo("vgId:%d, metafile is received, size:%" PRId64, REPO_ID(pRepo), readLen);
-
- mf.info = pSynch->pmf->info;
- tsdbCloseMFile(&mf);
- tsdbUpdateMFile(REPO_FS(pRepo), &mf);
- } else {
- pSynch->mfChanged = false;
- tsdbInfo("vgId:%d, metafile is same, no need to recv", REPO_ID(pRepo));
- if (tsdbSendDecision(pSynch, false) < 0) {
- tsdbError("vgId:%d, failed to send decision while recv metafile since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
- tsdbUpdateMFile(REPO_FS(pRepo), pLMFile);
- }
-
- return 0;
-}
-
-static int32_t tsdbSendMetaInfo(SSyncH *pSynch) {
- STsdb *pRepo = pSynch->pRepo;
- uint32_t tlen = 0;
- SMFile * pMFile = pRepo->fs->cstatus->pmf;
-
- if (pMFile) {
- tlen = tlen + tsdbEncodeSMFileEx(NULL, pMFile) + sizeof(TSCKSUM);
- }
-
- if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen + sizeof(tlen)) < 0) {
- tsdbError("vgId:%d, failed to makeroom while send metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- void *ptr = SYNC_BUFFER(pSynch);
- taosEncodeFixedU32(&ptr, tlen);
- void *tptr = ptr;
- if (pMFile) {
- tsdbEncodeSMFileEx(&ptr, pMFile);
- taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen);
- }
-
- int32_t writeLen = tlen + sizeof(uint32_t);
- int32_t ret = taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), writeLen);
- if (ret != writeLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to send metainfo since %s, ret:%d writeLen:%d", REPO_ID(pRepo), tstrerror(terrno), ret,
- writeLen);
- return -1;
- }
-
- tsdbInfo("vgId:%d, metainfo is sent, tlen:%d, writeLen:%d", REPO_ID(pRepo), tlen, writeLen);
- return 0;
-}
-
-static int32_t tsdbRecvMetaInfo(SSyncH *pSynch) {
- STsdb *pRepo = pSynch->pRepo;
- uint32_t tlen = 0;
- char buf[64] = {0};
-
- int32_t readLen = sizeof(uint32_t);
- int32_t ret = taosReadMsg(pSynch->socketFd, buf, readLen);
- if (ret != readLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to recv metalen, ret:%d readLen:%d", REPO_ID(pRepo), ret, readLen);
- return -1;
- }
-
- taosDecodeFixedU32(buf, &tlen);
-
- tsdbInfo("vgId:%d, metalen is received, readLen:%d, tlen:%d", REPO_ID(pRepo), readLen, tlen);
- if (tlen == 0) {
- pSynch->pmf = NULL;
- return 0;
- }
-
- if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
- tsdbError("vgId:%d, failed to makeroom while recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- ret = taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen);
- if (ret != tlen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to recv metainfo, ret:%d tlen:%d", REPO_ID(pRepo), ret, tlen);
- return -1;
- }
-
- tsdbInfo("vgId:%d, metainfo is received, tlen:%d", REPO_ID(pRepo), tlen);
- if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) {
- terrno = TSDB_CODE_TDB_MESSED_MSG;
- tsdbError("vgId:%d, failed to checksum while recv metainfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- pSynch->pmf = &(pSynch->mf);
- tsdbDecodeSMFileEx(SYNC_BUFFER(pSynch), pSynch->pmf);
-
- return 0;
-}
-
-static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend) {
- STsdb *pRepo = pSynch->pRepo;
- uint8_t decision = toSend;
-
- int32_t writeLen = sizeof(uint8_t);
- int32_t ret = taosWriteMsg(pSynch->socketFd, (void *)(&decision), writeLen);
- if (ret != writeLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to send decison, ret:%d writeLen:%d", REPO_ID(pRepo), ret, writeLen);
- return -1;
- }
-
- return 0;
-}
-
-static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend) {
- STsdb *pRepo = pSynch->pRepo;
- uint8_t decision = 0;
-
- int32_t readLen = sizeof(uint8_t);
- int32_t ret = taosReadMsg(pSynch->socketFd, (void *)(&decision), readLen);
- if (ret != readLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to recv decison, ret:%d readLen:%d", REPO_ID(pRepo), ret, readLen);
- return -1;
- }
-
- *toSend = decision;
- return 0;
-}
-
-static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
- STsdb *pRepo = pSynch->pRepo;
- STsdbFS * pfs = REPO_FS(pRepo);
- SFSIter fsiter;
- SDFileSet *pSet;
-
- tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
-
- do {
- pSet = tsdbFSIterNext(&fsiter);
- if (tsdbSyncSendDFileSet(pSynch, pSet) < 0) {
- tsdbError("vgId:%d, failed to send fileset:%d since %s", REPO_ID(pRepo), pSet ? pSet->fid : -1,
- tstrerror(terrno));
- return -1;
- }
-
- // No more file set to send, jut break
- if (pSet == NULL) {
- tsdbInfo("vgId:%d, no filesets any more", REPO_ID(pRepo));
- break;
- }
- } while (true);
-
- return 0;
-}
-
-static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
- STsdb *pRepo = pSynch->pRepo;
- STsdbFS * pfs = REPO_FS(pRepo);
- SFSIter fsiter;
- SDFileSet *pLSet; // Local file set
-
- tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
-
- pLSet = tsdbFSIterNext(&fsiter);
- if (tsdbRecvDFileSetInfo(pSynch) < 0) {
- tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- while (true) {
- if (pLSet == NULL && pSynch->pdf == NULL) {
- tsdbInfo("vgId:%d, all filesets is disposed", REPO_ID(pRepo));
- break;
- } else {
- tsdbInfo("vgId:%d, fileset local:%d remote:%d, will be disposed", REPO_ID(pRepo), pLSet != NULL ? pLSet->fid : -1,
- pSynch->pdf != NULL ? pSynch->pdf->fid : -1);
- }
-
- if (pLSet && (pSynch->pdf == NULL || pLSet->fid < pSynch->pdf->fid)) {
- // remote not has pLSet->fid set, just remove local (do nothing to remote the fset)
- tsdbInfo("vgId:%d, fileset:%d smaller than remote:%d, remove it", REPO_ID(pRepo), pLSet->fid,
- pSynch->pdf != NULL ? pSynch->pdf->fid : -1);
- pLSet = tsdbFSIterNext(&fsiter);
- } else {
- if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf) &&
- tsdbFSetIsOk(pLSet)) {
- // Just keep local files and notify remote not to send
- tsdbInfo("vgId:%d, fileset:%d is same and no need to recv", REPO_ID(pRepo), pLSet->fid);
-
- if (tsdbUpdateDFileSet(pfs, pLSet) < 0) {
- tsdbError("vgId:%d, failed to update fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- if (tsdbSendDecision(pSynch, false) < 0) {
- tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
- } else {
- // Need to copy from remote
- int fidLevel = tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn));
- if (fidLevel < 0) { // expired fileset
- tsdbInfo("vgId:%d, fileset:%d will be skipped as expired", REPO_ID(pRepo), pSynch->pdf->fid);
- if (tsdbSendDecision(pSynch, false) < 0) {
- tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
- // Move forward
- if (tsdbRecvDFileSetInfo(pSynch) < 0) {
- tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
- if (pLSet) {
- pLSet = tsdbFSIterNext(&fsiter);
- }
- // Next loop
- continue;
- } else {
- tsdbInfo("vgId:%d, fileset:%d will be received", REPO_ID(pRepo), pSynch->pdf->fid);
- // Notify remote to send there file here
- if (tsdbSendDecision(pSynch, true) < 0) {
- tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
- }
-
- // Create local files and copy from remote
- SDiskID did;
- SDFileSet fset;
-
- tfsAllocDisk(fidLevel, &(did.level), &(did.id));
- if (did.level == TFS_UNDECIDED_LEVEL) {
- terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
- tsdbError("vgId:%d, failed allc disk since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- tsdbInitDFileSet(&fset, did, REPO_ID(pRepo), pSynch->pdf->fid, FS_TXN_VERSION(pfs), pSynch->pdf->ver);
-
- // Create new FSET
- if (tsdbCreateDFileSet(&fset, false) < 0) {
- tsdbError("vgId:%d, failed to create fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSynch->pdf); ftype++) {
- SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file
- SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file
-
- tsdbInfo("vgId:%d, file:%s will be received, osize:%" PRIu64 " rsize:%" PRIu64, REPO_ID(pRepo),
- pDFile->f.aname, pDFile->info.size, pRDFile->info.size);
-
- int64_t writeLen = pRDFile->info.size;
- int64_t ret = taosCopyFds(pSynch->socketFd, pDFile->fd, writeLen);
- if (ret != writeLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to recv file:%s since %s, ret:%" PRId64 " writeLen:%" PRId64, REPO_ID(pRepo),
- pDFile->f.aname, tstrerror(terrno), ret, writeLen);
- tsdbCloseDFileSet(&fset);
- tsdbRemoveDFileSet(&fset);
- return -1;
- }
-
- // Update new file info
- pDFile->info = pRDFile->info;
- tsdbInfo("vgId:%d, file:%s is received, size:%" PRId64, REPO_ID(pRepo), pDFile->f.aname, writeLen);
- }
-
- tsdbCloseDFileSet(&fset);
- if (tsdbUpdateDFileSet(pfs, &fset) < 0) {
- tsdbInfo("vgId:%d, fileset:%d failed to update since %s", REPO_ID(pRepo), fset.fid, tstrerror(terrno));
- return -1;
- }
-
- tsdbInfo("vgId:%d, fileset:%d is received", REPO_ID(pRepo), pSynch->pdf->fid);
- }
-
- // Move forward
- if (tsdbRecvDFileSetInfo(pSynch) < 0) {
- tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- if (pLSet) {
- pLSet = tsdbFSIterNext(&fsiter);
- }
- }
-
-#if 0
- if (pLSet == NULL) {
- // Copy from remote >>>>>>>>>>>
- } else {
- if (pSynch->pdf == NULL) {
- // Remove local file, just ignore ++++++++++++++
- pLSet = tsdbFSIterNext(&fsiter);
- } else {
- if (pLSet->fid < pSynch->pdf->fid) {
- // Remove local file, just ignore ++++++++++++
- pLSet = tsdbFSIterNext(&fsiter);
- } else if (pLSet->fid > pSynch->pdf->fid){
- // Copy from remote >>>>>>>>>>>>>>
- if (tsdbRecvDFileSetInfo(pSynch) < 0) {
- // TODO
- return -1;
- }
- } else {
- if (true/*TODO: is same fset*/) {
- // No need to copy ---------------------
- } else {
- // copy from remote >>>>>>>>>>>>>.
- }
- }
- }
- }
-#endif
- }
-
- return 0;
-}
-
-static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
- if (pSet1->ver != pSet2->ver) {
- return false;
- }
- for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet1); ftype++) {
- SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype);
- SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype);
-
- if (pDFile1->info.size != pDFile2->info.size || pDFile1->info.magic != pDFile2->info.magic) {
- return false;
- }
- }
-
- return true;
-}
-
-static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
- STsdb *pRepo = pSynch->pRepo;
- bool toSend = false;
-
- // skip expired fileset
- if (pSet && tsdbGetFidLevel(pSet->fid, &(pSynch->rtn)) < 0) {
- tsdbInfo("vgId:%d, don't sync send since fileset:%d smaller than minFid:%d", REPO_ID(pRepo), pSet->fid,
- pSynch->rtn.minFid);
- return 0;
- }
-
- if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) {
- tsdbError("vgId:%d, failed to send fileset:%d info since %s", REPO_ID(pRepo), pSet ? pSet->fid : -1, tstrerror(terrno));
- return -1;
- }
-
- // No file any more, no need to send file, just return
- if (pSet == NULL) {
- return 0;
- }
-
- if (tsdbRecvDecision(pSynch, &toSend) < 0) {
- tsdbError("vgId:%d, failed to recv decision while send fileset:%d since %s", REPO_ID(pRepo), pSet->fid,
- tstrerror(terrno));
- return -1;
- }
-
- if (toSend) {
- tsdbInfo("vgId:%d, fileset:%d will be sent", REPO_ID(pRepo), pSet->fid);
-
- for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) {
- SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype);
-
- if (tsdbOpenDFile(&df, O_RDONLY) < 0) {
- tsdbError("vgId:%d, failed to file:%s since %s", REPO_ID(pRepo), df.f.aname, tstrerror(terrno));
- return -1;
- }
-
- int64_t writeLen = df.info.size;
- tsdbInfo("vgId:%d, file:%s will be sent, size:%" PRId64, REPO_ID(pRepo), df.f.aname, writeLen);
-
- int64_t ret = taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&df), 0, writeLen);
- if (ret != writeLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to send file:%s since %s, ret:%" PRId64 " writeLen:%" PRId64, REPO_ID(pRepo),
- df.f.aname, tstrerror(terrno), ret, writeLen);
- tsdbCloseDFile(&df);
- return -1;
- }
-
- tsdbInfo("vgId:%d, file:%s is sent", REPO_ID(pRepo), df.f.aname);
- tsdbCloseDFile(&df);
- }
-
- tsdbInfo("vgId:%d, fileset:%d is sent", REPO_ID(pRepo), pSet->fid);
- } else {
- tsdbInfo("vgId:%d, fileset:%d is same, no need to send", REPO_ID(pRepo), pSet->fid);
- }
-
- return 0;
-}
-
-static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) {
- STsdb *pRepo = pSynch->pRepo;
- uint32_t tlen = 0;
-
- if (pSet) {
- tlen = tsdbEncodeDFileSetEx(NULL, pSet) + sizeof(TSCKSUM);
- }
-
- if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen + sizeof(tlen)) < 0) {
- tsdbError("vgId:%d, failed to makeroom while send fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- void *ptr = SYNC_BUFFER(pSynch);
- taosEncodeFixedU32(&ptr, tlen);
- void *tptr = ptr;
- if (pSet) {
- tsdbEncodeDFileSetEx(&ptr, pSet);
- taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen);
- }
-
- int32_t writeLen = tlen + sizeof(uint32_t);
- int32_t ret = taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), writeLen);
- if (ret != writeLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to send fileinfo, ret:%d writeLen:%d", REPO_ID(pRepo), ret, writeLen);
- return -1;
- }
-
- return 0;
-}
-
-static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) {
- STsdb *pRepo = pSynch->pRepo;
- uint32_t tlen;
- char buf[64] = {0};
-
- int32_t readLen = sizeof(uint32_t);
- int32_t ret = taosReadMsg(pSynch->socketFd, buf, readLen);
- if (ret != readLen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- return -1;
- }
-
- taosDecodeFixedU32(buf, &tlen);
-
- tsdbInfo("vgId:%d, fileinfo len:%d is received", REPO_ID(pRepo), tlen);
- if (tlen == 0) {
- pSynch->pdf = NULL;
- return 0;
- }
-
- if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
- tsdbError("vgId:%d, failed to makeroom while recv fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- ret = taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen);
- if (ret != tlen) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- tsdbError("vgId:%d, failed to recv fileinfo, ret:%d readLen:%d", REPO_ID(pRepo), ret, tlen);
- return -1;
- }
-
- if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) {
- terrno = TSDB_CODE_TDB_MESSED_MSG;
- tsdbError("vgId:%d, failed to checksum while recv fileinfo since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- pSynch->pdf = &(pSynch->df);
- tsdbDecodeDFileSetEx(SYNC_BUFFER(pSynch), pSynch->pdf);
-
- return 0;
-}
-
-static int tsdbReload(STsdb *pRepo, bool isMfChanged) {
- // TODO: may need to stop and restart stream
- // if (isMfChanged) {
- tsdbCloseMeta(pRepo);
- tsdbFreeMeta(pRepo->tsdbMeta);
- pRepo->tsdbMeta = tsdbNewMeta(REPO_CFG(pRepo));
- tsdbOpenMeta(pRepo);
- tsdbLoadMetaCache(pRepo, true);
- // }
-
- tsdbUnRefMemTable(pRepo, pRepo->mem);
- tsdbUnRefMemTable(pRepo, pRepo->imem);
- pRepo->mem = NULL;
- pRepo->imem = NULL;
-
- if (tsdbRestoreInfo(pRepo) < 0) {
- tsdbError("vgId:%d failed to restore info from file since %s", REPO_ID(pRepo), tstrerror(terrno));
- return -1;
- }
-
- return 0;
-}
\ No newline at end of file