提交 7d006b6f 编写于 作者: H Hongze Cheng

first draft of sync

上级 cd6df491
...@@ -48,7 +48,7 @@ typedef struct { ...@@ -48,7 +48,7 @@ typedef struct {
void *cqH; void *cqH;
int (*notifyStatus)(void *, int status, int eno); int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *); int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema); void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema);
void (*cqDropFunc)(void *handle); void (*cqDropFunc)(void *handle);
} STsdbAppH; } STsdbAppH;
...@@ -76,17 +76,17 @@ typedef struct { ...@@ -76,17 +76,17 @@ typedef struct {
int64_t pointsWritten; // total data points written int64_t pointsWritten; // total data points written
} STsdbStat; } STsdbStat;
typedef void TSDB_REPO_T; // use void to hide implementation details from outside typedef struct STsdbRepo STsdbRepo;
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo); STsdbCfg *tsdbGetCfg(const STsdbRepo *repo);
// --------- TSDB REPOSITORY DEFINITION // --------- TSDB REPOSITORY DEFINITION
int32_t tsdbCreateRepo(int repoid); int32_t tsdbCreateRepo(int repoid);
int32_t tsdbDropRepo(int repoid); int32_t tsdbDropRepo(int repoid);
TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(TSDB_REPO_T *repo); int tsdbGetState(STsdbRepo *repo);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
typedef struct { typedef struct {
...@@ -110,18 +110,18 @@ typedef struct { ...@@ -110,18 +110,18 @@ typedef struct {
void tsdbClearTableCfg(STableCfg *config); void tsdbClearTableCfg(STableCfg *config);
void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes); void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes);
char* tsdbGetTableName(void *pTable); char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table)) #define TSDB_TABLEID(_table) ((STableId *)(_table))
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbDropTable(STsdbRepo *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
// the TSDB repository info // the TSDB repository info
typedef struct STsdbRepoInfo { typedef struct STsdbRepoInfo {
...@@ -131,7 +131,7 @@ typedef struct STsdbRepoInfo { ...@@ -131,7 +131,7 @@ typedef struct STsdbRepoInfo {
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add // TODO: Other informations to add
} STsdbRepoInfo; } STsdbRepoInfo;
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo); STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo);
// the meter information report structure // the meter information report structure
typedef struct { typedef struct {
...@@ -140,7 +140,7 @@ typedef struct { ...@@ -140,7 +140,7 @@ typedef struct {
int64_t tableTotalDataSize; // In bytes int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes int64_t tableTotalDiskSize; // In bytes
} STableInfo; } STableInfo;
STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid); STableInfo *tsdbGetTableInfo(STsdbRepo *pRepo, STableId tid);
// -- FOR INSERT DATA // -- FOR INSERT DATA
/** /**
...@@ -150,7 +150,7 @@ STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid); ...@@ -150,7 +150,7 @@ STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid);
* *
* @return the number of points inserted, -1 for failure and the error number is set * @return the number of points inserted, -1 for failure and the error number is set
*/ */
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
// -- FOR QUERY TIME SERIES DATA // -- FOR QUERY TIME SERIES DATA
...@@ -166,8 +166,8 @@ typedef struct STsdbQueryCond { ...@@ -166,8 +166,8 @@ typedef struct STsdbQueryCond {
typedef struct SMemRef { typedef struct SMemRef {
int32_t ref; int32_t ref;
void *mem; void * mem;
void *imem; void * imem;
} SMemRef; } SMemRef;
typedef struct SDataBlockInfo { typedef struct SDataBlockInfo {
...@@ -185,7 +185,7 @@ typedef struct { ...@@ -185,7 +185,7 @@ typedef struct {
typedef struct { typedef struct {
size_t numOfTables; size_t numOfTables;
SArray *pGroupList; SArray * pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo; } STableGroupInfo;
...@@ -199,7 +199,8 @@ typedef struct { ...@@ -199,7 +199,8 @@ typedef struct {
* @param qinfo query info handle from query processor * @param qinfo query info handle from query processor
* @return * @return
*/ */
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo, SMemRef* pRef); TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo,
SMemRef *pRef);
/** /**
* Get the last row of the given query time window for all the tables in STableGroupInfo object. * Get the last row of the given query time window for all the tables in STableGroupInfo object.
...@@ -211,14 +212,15 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab ...@@ -211,14 +212,15 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
* @param tableInfo table list. * @param tableInfo table list.
* @return * @return
*/ */
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo, SMemRef* pRef); TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo,
SMemRef *pRef);
/** /**
* get the queried table object list * get the queried table object list
* @param pHandle * @param pHandle
* @return * @return
*/ */
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
/** /**
* get the group list according to table id from client * get the group list according to table id from client
...@@ -228,8 +230,8 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); ...@@ -228,8 +230,8 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
* @param qinfo * @param qinfo
* @return * @return
*/ */
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo, SMemRef* pRef); void *qinfo, SMemRef *pRef);
/** /**
* move to next block if exists * move to next block if exists
...@@ -246,7 +248,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle); ...@@ -246,7 +248,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
* @param pBlockInfo * @param pBlockInfo
* @return * @return
*/ */
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo* pBlockInfo); void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo);
/** /**
* *
...@@ -277,7 +279,7 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdL ...@@ -277,7 +279,7 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdL
* @param stableid. super table sid * @param stableid. super table sid
* @param pTagCond. tag query condition * @param pTagCond. tag query condition
*/ */
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList, int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols); SColIndex *pColIndex, int32_t numOfCols);
...@@ -295,7 +297,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); ...@@ -295,7 +297,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
* @param pGroupInfo the generated result * @param pGroupInfo the generated result
* @return * @return
*/ */
int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/** /**
* *
...@@ -304,7 +306,7 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY startKey, ST ...@@ -304,7 +306,7 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY startKey, ST
* @param pGroupInfo * @param pGroupInfo
* @return * @return
*/ */
int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo); int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/** /**
* clean up the query handle * clean up the query handle
...@@ -323,10 +325,14 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int ...@@ -323,10 +325,14 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
int tsdbInitCommitQueue(); int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue(); void tsdbDestroyCommitQueue();
int tsdbSyncCommit(TSDB_REPO_T *repo); int tsdbSyncCommit(STsdbRepo *repo);
void tsdbIncCommitRef(int vgId); void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId); void tsdbDecCommitRef(int vgId);
// For TSDB file sync
int tsdbSyncSend(STsdbRepo *pRepo, int socketFd);
int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -19,16 +19,37 @@ ...@@ -19,16 +19,37 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct {
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
typedef struct { typedef struct {
uint64_t uid; uint64_t uid;
int64_t offset; int64_t offset;
int64_t size; int64_t size;
} SKVRecord; } SKVRecord;
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -48,8 +48,6 @@ ...@@ -48,8 +48,6 @@
extern "C" { extern "C" {
#endif #endif
typedef struct STsdbRepo STsdbRepo;
// Log // Log
#include "tsdbLog.h" #include "tsdbLog.h"
// Meta // Meta
...@@ -94,7 +92,7 @@ struct STsdbRepo { ...@@ -94,7 +92,7 @@ struct STsdbRepo {
int tsdbLockRepo(STsdbRepo* pRepo); int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo);
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
......
...@@ -17,13 +17,6 @@ ...@@ -17,13 +17,6 @@
#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_MAX_SUBBLOCKS 8
#define TSDB_KEY_FID(key, days, precision) ((key) / tsMsPerDay[(precision)] / (days)) #define TSDB_KEY_FID(key, days, precision) ((key) / tsMsPerDay[(precision)] / (days))
typedef struct {
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
typedef struct { typedef struct {
SRtn rtn; // retention snapshot SRtn rtn; // retention snapshot
SFSIter fsIter; // tsdb file iterator SFSIter fsIter; // tsdb file iterator
...@@ -67,7 +60,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith); ...@@ -67,7 +60,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo);
static void tsdbDestroyCommitH(SCommitH *pCommith); static void tsdbDestroyCommitH(SCommitH *pCommith);
static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
static int tsdbGetFidLevel(int fid, SRtn *pRtn); static int tsdbGetFidLevel(int fid, SRtn *pRtn);
static int tsdbNextCommitFid(SCommitH *pCommith); static int tsdbNextCommitFid(SCommitH *pCommith);
static int tsdbCommitToTable(SCommitH *pCommith, int tid); static int tsdbCommitToTable(SCommitH *pCommith, int tid);
...@@ -203,6 +195,21 @@ void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { ...@@ -203,6 +195,21 @@ void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) {
return buf; return buf;
} }
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision);
pRtn->midFid = TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision);
pRtn->maxFid = TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision);
}
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) { static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) {
char buf[64] = "\0"; char buf[64] = "\0";
void * pBuf = buf; void * pBuf = buf;
...@@ -552,33 +559,6 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { ...@@ -552,33 +559,6 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
} }
static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision);
pRtn->midFid = TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision);
pRtn->maxFid = TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision);
}
static int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
static int tsdbNextCommitFid(SCommitH *pCommith) { static int tsdbNextCommitFid(SCommitH *pCommith) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
......
...@@ -59,7 +59,7 @@ int32_t tsdbDropRepo(int repoid) { ...@@ -59,7 +59,7 @@ int32_t tsdbDropRepo(int repoid) {
return tfsRmdir(tsdbDir); return tfsRmdir(tsdbDir);
} }
TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdbRepo *pRepo; STsdbRepo *pRepo;
STsdbCfg config = *pCfg; STsdbCfg config = *pCfg;
...@@ -109,14 +109,14 @@ TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -109,14 +109,14 @@ TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo)); tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo));
return (TSDB_REPO_T *)pRepo; return pRepo;
} }
// Note: all working thread and query thread must stopped when calling this function // Note: all working thread and query thread must stopped when calling this function
int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
if (repo == NULL) return 0; if (repo == NULL) return 0;
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = repo;
int vgId = REPO_ID(pRepo); int vgId = REPO_ID(pRepo);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -144,7 +144,7 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -144,7 +144,7 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
} }
} }
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { STsdbCfg *tsdbGetCfg(const STsdbRepo *repo) {
ASSERT(repo != NULL); ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config; return &((STsdbRepo *)repo)->config;
} }
...@@ -187,11 +187,11 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { ...@@ -187,11 +187,11 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
return 0; return 0;
} }
STsdbMeta *tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } STsdbMeta *tsdbGetMeta(STsdbRepo *pRepo) { return pRepo->tsdbMeta; }
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; }
int tsdbGetState(TSDB_REPO_T *repo) { return ((STsdbRepo *)repo)->state; } int tsdbGetState(STsdbRepo *repo) { return repo->state; }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL); ASSERT(repo != NULL);
...@@ -201,7 +201,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int ...@@ -201,7 +201,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int
*compStorage = pRepo->stat.compStorage; *compStorage = pRepo->stat.compStorage;
} }
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases // TODO: think about multithread cases
return 0; return 0;
#if 0 #if 0
...@@ -253,7 +253,7 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { ...@@ -253,7 +253,7 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
#endif #endif
} }
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
// TODO // TODO
return 0; return 0;
#if 0 #if 0
......
...@@ -52,8 +52,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, ...@@ -52,8 +52,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable,
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now); TSKEY now);
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdbRepo * pRepo = (STsdbRepo *)repo; STsdbRepo * pRepo = repo;
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL; SSubmitBlk * pBlock = NULL;
int32_t affectedrows = 0; int32_t affectedrows = 0;
...@@ -236,8 +236,8 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -236,8 +236,8 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
return 0; return 0;
} }
int tsdbSyncCommit(TSDB_REPO_T *repo) { int tsdbSyncCommit(STsdbRepo *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = repo;
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo);
sem_wait(&(pRepo->readyToCommit)); sem_wait(&(pRepo->readyToCommit));
......
...@@ -45,7 +45,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); ...@@ -45,7 +45,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
// ------------------ OUTER FUNCTIONS ------------------ // ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
STable * super = NULL; STable * super = NULL;
...@@ -140,7 +140,7 @@ _err: ...@@ -140,7 +140,7 @@ _err:
return -1; return -1;
} }
int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { int tsdbDropTable(STsdbRepo *repo, STableId tableId) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
uint64_t uid = tableId.uid; uint64_t uid = tableId.uid;
...@@ -293,7 +293,7 @@ static UNUSED_FUNC int32_t colIdCompar(const void* left, const void* right) { ...@@ -293,7 +293,7 @@ static UNUSED_FUNC int32_t colIdCompar(const void* left, const void* right) {
return (colId < p2->colId)? -1:1; return (colId < p2->colId)? -1:1;
} }
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
STSchema * pNewSchema = NULL; STSchema * pNewSchema = NULL;
......
...@@ -275,7 +275,7 @@ static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) ...@@ -275,7 +275,7 @@ static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey)
return pNew; return pNew;
} }
static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo, SMemRef* pMemRef) { static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, void* qinfo, SMemRef* pMemRef) {
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
goto out_of_memory; goto out_of_memory;
...@@ -295,7 +295,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* ...@@ -295,7 +295,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond*
pQueryHandle->locateStart = false; pQueryHandle->locateStart = false;
pQueryHandle->pMemRef = pMemRef; pQueryHandle->pMemRef = pMemRef;
if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) { if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) {
goto out_of_memory; goto out_of_memory;
} }
...@@ -354,7 +354,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* ...@@ -354,7 +354,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond*
return NULL; return NULL;
} }
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo, SMemRef* pRef) { TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo, SMemRef* pRef) {
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo, pRef); STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo, pRef);
STsdbMeta* pMeta = tsdbGetMeta(tsdb); STsdbMeta* pMeta = tsdbGetMeta(tsdb);
...@@ -372,7 +372,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -372,7 +372,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
return (TsdbQueryHandleT) pQueryHandle; return (TsdbQueryHandleT) pQueryHandle;
} }
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
// no qualified table // no qualified table
...@@ -408,7 +408,7 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { ...@@ -408,7 +408,7 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
return res; return res;
} }
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pRef) { TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pRef); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pRef);
if (pQueryHandle != NULL) { if (pQueryHandle != NULL) {
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
...@@ -2720,7 +2720,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -2720,7 +2720,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) { SColIndex* pColIndex, int32_t numOfCols) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
...@@ -2815,7 +2815,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY skey, co ...@@ -2815,7 +2815,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY skey, co
return terrno; return terrno;
} }
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
...@@ -2845,7 +2845,7 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY startKey, ST ...@@ -2845,7 +2845,7 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY startKey, ST
return terrno; return terrno;
} }
int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) { int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) { if (tsdbRLockRepoMeta(tsdb) < 0) {
return terrno; return terrno;
} }
......
...@@ -15,363 +15,543 @@ ...@@ -15,363 +15,543 @@
#include "tsdbint.h" #include "tsdbint.h"
static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf); // Sync handle
static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd); typedef struct {
static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet); STsdbRepo *pRepo;
static int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd); SRtn rtn;
static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); int socketFd;
void * pBuf;
SMFile * pmf;
SMFile mf;
SDFileSet df;
SDFileSet *pdf;
} SSyncH;
#define SYNC_BUFFER(sh) ((sh)->pBuf)
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd);
static void tsdbDestroySyncH(SSyncH *pSyncH);
static int tsdbSyncSendMeta(SSyncH *pSynch);
static int tsdbSyncRecvMeta(SSyncH *pSynch);
static int tsdbSendMetaInfo(SSyncH *pSynch);
static int tsdbRecvMetaInfo(SSyncH *pSynch);
static int tsdbSendDecision(SSyncH *pSynch, bool toSend);
static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend);
static int tsdbSyncSendDFileSetArray(SSyncH *pSynch);
static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch);
static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2);
static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet);
static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet);
static int tsdbRecvDFileSetInfo(SSyncH *pSynch);
int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) { int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) {
STsdbFS * pfs = REPO_FS(pRepo); SSyncH synch = {0};
SFSIter fsiter;
SDFileSet *pSet;
// Disable commit while syncing TSDB files
sem_wait(&(pRepo->readyToCommit));
// Sync send meta file tsdbInitSyncH(&synch, pRepo, socketFd);
if (tsdbSyncSendMeta(pRepo, socketFd, pfs->cstatus->pmf) < 0) { // Disable TSDB commit
tsdbError("vgId:%d failed to sync send meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
sem_post(&(pRepo->readyToCommit)); sem_post(&(pRepo->readyToCommit));
return -1;
}
// Sync send SDFileSet
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) { if (tsdbSyncSendMeta(&synch) < 0) {
if (tsdbSyncSendDFileSet(pRepo, socketFd, pSet) < 0) { tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
sem_post(&(pRepo->readyToCommit)); goto _err;
return -1;
} }
if (tsdbSyncSendDFileSetArray(&synch) < 0) {
tsdbError("vgId:%d failed to send data file set array since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
} }
// Enable commit // Enable TSDB commit
sem_post(&(pRepo->readyToCommit)); sem_wait(&(pRepo->readyToCommit));
tsdbDestroySyncH(&synch);
return 0; return 0;
_err:
sem_wait(&(pRepo->readyToCommit));
tsdbDestroySyncH(&synch);
return -1;
} }
int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd) { int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd) {
SFSIter fsiter; SSyncH synch;
SDFileSet *pSet;
SDFileSet dset;
SDFileSet *pRecvSet = &dset;
uint32_t tlen;
char buf[128];
void * pBuf = NULL;
tsdbInitSyncH(&synch, pRepo, socketFd);
tsdbStartFSTxn(pRepo, 0, 0); tsdbStartFSTxn(pRepo, 0, 0);
// Sync recv meta file from remote if (tsdbSyncRecvMeta(&synch) < 0) {
if (tsdbSyncRecvMeta(pRepo, socketFd) < 0) { tsdbError("vgId:%d failed to sync recv meta file from remote since %s", REPO_ID(pRepo), tstrerror(terrno));
// TODO
goto _err; goto _err;
} }
// Sync recv SDFileSet if (tsdbSyncRecvDFileSetArray(&synch) < 0) {
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); tsdbError("vgId:%d failed to sync recv data file set from remote since %s", REPO_ID(pRepo), tstrerror(terrno));
pSet = tsdbFSIterNext(&fsiter);
if (taosReadMsg(socketFd, buf, sizeof(uint32_t)) < 0) {
// TODO
goto _err; goto _err;
} }
taosDecodeFixedU32(buf, &tlen); // TODO: need to restart TSDB or reload TSDB here
if (tlen == 0) {
// No more remote files
pRecvSet = NULL;
} else {
// Has remote files
if (tsdbMakeRoom(&pBuf, tlen) < 0) {
// TODO
goto _err;
}
if (taosReadMsg(socketFd, pBuf, tlen) < tlen) { tsdbEndFSTxn(pRepo);
// TODO tsdbDestroySyncH(&synch);
goto _err; return 0;
}
_err:
tsdbEndFSTxnWithError(REPO_FS(pRepo));
tsdbDestroySyncH(&synch);
return -1;
}
static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd) {
pSyncH->pRepo = pRepo;
pSyncH->socketFd = socketFd;
tsdbGetRtnSnap(pRepo, &(pSyncH->rtn));
}
pRecvSet = &dset; static void tsdbDestroySyncH(SSyncH *pSyncH) { taosTZfree(pSyncH->pBuf); }
tsdbDecodeDFileSet(pBuf, pRecvSet);
// ============ SYNC META API
static int tsdbSyncSendMeta(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
bool toSendMeta = false;
SMFile mf;
// Send meta info to remote
if (tsdbSendMetaInfo(pSynch) < 0) {
tsdbError("vgId:%d failed to send meta file info since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
} }
while (true) { if (pRepo->fs->cstatus->pmf == NULL) {
if (pSet == NULL && pRecvSet == NULL) break; // No meta file, not need to wait to retrieve meta file
return 0;
}
if (pSet == NULL) { if (tsdbRecvDecision(pSynch, &toSendMeta) < 0) {
// TODO: local not has, copy from remote tsdbError("vgId:%d failed to recv send file decision since %s", REPO_ID(pRepo), tstrerror(terrno));
// Process the next remote fset(next pRecvSet) return -1;
} else {
if (pRecvSet == NULL) {
// Remote not has, just remove this file
pSet = tsdbFSIterNext(&fsiter);
} else {
if (pSet->fid == pRecvSet->fid) {
if (tsdbIsFSetSame(pSet, pRecvSet)) {
tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
} else {
// Copy from remote
} }
pSet = tsdbFSIterNext(&fsiter);
// Process the next remote fset if (toSendMeta) {
} else if (pSet->fid < pRecvSet->fid) { tsdbInitMFileEx(&mf, pRepo->fs->cstatus->pmf);
// Remote has not, just remove this file if (tsdbOpenMFile(&mf, O_RDONLY) < 0) {
pSet = tsdbFSIterNext(&fsiter); tsdbError("vgId:%d failed to open meta file while sync send meta since %s", REPO_ID(pRepo), tstrerror(terrno));
} else { return -1;
// not has, copy pRecvSet from remote
// Process the next remote fset
} }
if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&mf), 0, mf.info.size) < mf.info.size) {
tsdbError("vgId:%d failed to copy meta file to remote since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbCloseMFile(&mf);
return -1;
} }
tsdbCloseMFile(&mf);
} }
return 0;
}
static int tsdbSyncRecvMeta(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
SMFile * pLMFile = pRepo->fs->cstatus->pmf;
// Recv meta info from remote
if (tsdbRecvMetaInfo(pSynch) < 0) {
tsdbError("vgId:%d failed to recv meta info from remote since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
} }
tsdbEndFSTxn(pRepo); // No meta file, do nothing (rm local meta file)
if (pSynch->pmf == NULL) {
return 0; return 0;
}
_err: if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) {
taosTZfree(pBuf); // Local has no meta file or has a different meta file, need to copy from remote
tsdbEndFSTxnWithError(REPO_FS(pRepo)); if (tsdbSendDecision(pSynch, true) < 0) {
// TODO
return -1; return -1;
} }
static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf) { // Recv from remote
void * pBuf = NULL;
uint32_t tlen = 0;
void * ptr;
SMFile mf; SMFile mf;
SMFile * pMFile = NULL; SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID};
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateMFile(&mf, false) < 0) {
// TODO
return -1;
}
if (pmf) { if (taosCopyFds(pSynch->socketFd, TSDB_FILE_FD(&mf), pSynch->pmf->info.size) < pSynch->pmf->info.size) {
// copy out // TODO
mf = *pmf; tsdbCloseMFile(&mf);
pMFile = &mf; tsdbRemoveMFile(&mf);
return -1;
} }
tsdbCloseMFile(&mf);
tsdbUpdateMFile(REPO_FS(pRepo), &mf);
} else {
if (tsdbSendDecision(pSynch, false) < 0) {
// TODO
return -1;
}
}
return 0;
}
static int tsdbSendMetaInfo(SSyncH *pSynch) {
STsdbRepo *pRepo = pSynch->pRepo;
uint32_t tlen = 0;
SMFile * pMFile = pRepo->fs->cstatus->pmf;
if (pMFile) { if (pMFile) {
tlen = tsdbEncodeMFInfo(NULL, TSDB_FILE_INFO(pMFile)) + sizeof(TSCKSUM); tlen = tlen + tsdbEncodeSMFile(NULL, pMFile) + sizeof(TSCKSUM);
} }
if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) { if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
return -1; return -1;
} }
ptr = pBuf; void *ptr = SYNC_BUFFER(pSynch);
taosEncodeFixedU32(&ptr, tlen); taosEncodeFixedU32(&ptr, tlen);
void *tptr = ptr;
if (pMFile) { if (pMFile) {
tsdbEncodeMFInfo(&ptr, TSDB_FILE_INFO(pMFile)); tsdbEncodeSMFile(&ptr, pMFile);
taosCalcChecksumAppend(0, (uint8_t *)pBuf, POINTER_DISTANCE(ptr, pBuf)); taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen);
ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM));
} }
if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) { if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(uint32_t)) < tlen + sizeof(uint32_t)) {
tsdbError("vgId:%d failed to sync meta file since %s", REPO_ID(pRepo), strerror(errno)); tsdbError("vgId:%d failed to send sync meta file info to remote since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; return -1;
} }
if (pMFile == NULL) {
// No meta file, no need to send
return 0; return 0;
}
static int tsdbRecvMetaInfo(SSyncH *pSynch) {
uint32_t tlen;
char buf[64] = "\0";
if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) {
// TODO
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
bool shouldSend = false; taosDecodeFixedU32(buf, &tlen);
{
// TODO: Recv command to know if need to send file if (tlen == 0) {
pSynch->pmf = NULL;
return 0;
} }
if (shouldSend) { if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { return -1;
tsdbError("vgId:%d failed to open meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
} }
if (taosSendFile(socketFd, TSDB_FILE_FD(pMFile), 0, pMFile->info.size) < pMFile->info.size) { if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) {
tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), strerror(errno)); // TODO
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseMFile(pMFile); return -1;
goto _err;
} }
tsdbCloseMFile(pMFile); if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) {
// TODO
terrno = TSDB_CODE_TDB_MESSED_MSG;
return -1;
} }
pSynch->pmf = &(pSynch->mf);
tsdbDecodeSMFile(SYNC_BUFFER(pSynch), pSynch->pmf);
return 0; return 0;
}
_err: static int tsdbSendDecision(SSyncH *pSynch, bool toSend) {
taosTZfree(pBuf); uint8_t decision = toSend;
if (taosWriteMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) {
// TODO
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
}
return 0;
} }
static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd) { static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend) {
uint32_t tlen; uint8_t decision;
char buf[128];
void * pBuf = NULL;
SMFInfo mfInfo;
SMFile * pMFile = pRepo->fs->cstatus->pmf;
SMFile mf;
if (taosReadMsg(socketFd, (void *)buf, sizeof(int32_t)) < sizeof(int32_t)) { if (taosReadMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) {
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); // TODO
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; return -1;
} }
taosDecodeFixedU32(buf, &tlen);
// Remote not has meta file, just remove meta file (do nothing) *toSend = decision;
if (tlen == 0) {
// TODO: need to notify remote?
return 0; return 0;
} }
if (tsdbMakeRoom(&pBuf, tlen) < 0) { // ========== SYNC DATA FILE SET ARRAY API
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) {
goto _err; STsdbRepo *pRepo = pSynch->pRepo;
} STsdbFS * pfs = REPO_FS(pRepo);
SFSIter fsiter;
SDFileSet *pSet;
if (taosReadMsg(socketFd, pBuf, tlen) < tlen) { tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); do {
goto _err; pSet = tsdbFSIterNext(&fsiter);
if (tsdbSyncSendDFileSet(pSynch, pSet) < 0) {
// TODO
return -1;
} }
if (!taosCheckChecksumWhole((uint8_t *)pBuf, tlen)) { // No more file set to send, jut break
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); if (pSet == NULL) {
terrno = TSDB_CODE_TDB_MESSED_MSG; break;
goto _err;
} }
} while (true);
void *ptr = pBuf; return 0;
ptr = tsdbDecodeMFInfo(ptr, &mfInfo); }
if (pMFile != NULL && memcmp(&(pMFile->info), &mfInfo, sizeof(SMInfo)) == 0) { static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) {
// has file and same as remote, just keep the old one STsdbRepo *pRepo = pSynch->pRepo;
tsdbUpdateMFile(REPO_FS(pRepo), pMFile); STsdbFS * pfs = REPO_FS(pRepo);
// Notify remote that no need to send meta file SFSIter fsiter;
{ SDFileSet *pLSet; // Local file set
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
pLSet = tsdbFSIterNext(&fsiter);
if (tsdbRecvDFileSetInfo(pSynch) < 0) {
// TODO // TODO
return -1;
} }
} else {
// Need to copy meta file from remote
SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID};
tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)));
mf.info = mfInfo;
// Create new file while (true) {
if (tsdbCreateMFile(&mf, false) < 0) { if (pLSet == NULL && pSynch->pdf == NULL) break;
tsdbError("vgId:%d failed to create meta file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; if (pLSet && (pSynch->pdf == NULL || pLSet->fid < pSynch->pdf->fid)) {
// remote not has pLSet->fid set, just remove local (do nothing to remote the fset)
pLSet = tsdbFSIterNext(&fsiter);
} else {
if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) {
// Just keep local files and notify remote not to send
if (tsdbUpdateDFileSet(pfs, pLSet) < 0) {
// TODO
return -1;
} }
// Notify remote to send meta file if (tsdbSendDecision(pSynch, false) < 0) {
{
// TODO // TODO
return -1;
} }
} else {
// Need to copy from remote
if (taosCopyFds(socketFd, mf.fd, mfInfo.size) < 0) { // Notify remote to send there file here
tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); if (tsdbSendDecision(pSynch, true) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); // TODO
tsdbCloseMFile(&mf); return -1;
tsdbRemoveMFile(&mf);
goto _err;
} }
TSDB_FILE_FSYNC(&mf); // Create local files and copy from remote
tsdbCloseMFile(&mf); SDiskID did;
tsdbUpdateMFile(REPO_FS(pRepo), &mf); SDFileSet fset;
tfsAllocDisk(tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
} }
return 0; tsdbInitDFileSet(&fset, did, REPO_ID(pRepo), pSynch->pdf->fid, FS_TXN_VERSION(pfs));
_err: // Create new FSET
taosTZfree(pBuf); if (tsdbCreateDFileSet(&fset, false) < 0) {
// TODO
return -1; return -1;
} }
static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
void * pBuf = NULL; SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file
uint32_t tlen = 0; SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file
void * ptr; if (taosCopyFds(pSynch->socketFd, pDFile->fd, pRDFile->info.size) < pRDFile->info.size) {
SDFileSet dset; // TODO
SDFileSet *pSet = NULL; terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseDFileSet(&fset);
tsdbRemoveDFileSet(&fset);
return -1;
}
// Update new file info
pDFile->info = pRDFile->info;
}
if (pOSet) { tsdbCloseDFileSet(&fset);
dset = *pOSet; if (tsdbUpdateDFileSet(pfs, &fset) < 0) {
pSet = &dset; // TODO
return -1;
}
} }
if (pSet) { // Move forward
tlen = tsdbEncodeDFileSet(NULL, pSet) + sizeof(TSCKSUM); if (tsdbRecvDFileSetInfo(pSynch) < 0) {
// TODO
return -1;
} }
if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) { if (pLSet) {
pLSet = tsdbFSIterNext(&fsiter);
}
}
#if 0
if (pLSet == NULL) {
// Copy from remote >>>>>>>>>>>
} else {
if (pSynch->pdf == NULL) {
// Remove local file, just ignore ++++++++++++++
pLSet = tsdbFSIterNext(&fsiter);
} else {
if (pLSet->fid < pSynch->pdf->fid) {
// Remove local file, just ignore ++++++++++++
pLSet = tsdbFSIterNext(&fsiter);
} else if (pLSet->fid > pSynch->pdf->fid){
// Copy from remote >>>>>>>>>>>>>>
if (tsdbRecvDFileSetInfo(pSynch) < 0) {
// TODO
return -1; return -1;
} }
} else {
if (true/*TODO: is same fset*/) {
// No need to copy ---------------------
} else {
// copy from remote >>>>>>>>>>>>>.
}
}
}
}
#endif
}
ptr = pBuf; return 0;
taosEncodeFixedU32(&ptr, tlen); }
if (pSet) {
tsdbEncodeDFileSet(&ptr, pSet); static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) {
taosCalcChecksumAppend(0, (uint8_t *)pBuf, tlen); for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM)); SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype);
SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype);
if (memcmp((void *)(TSDB_FILE_INFO(pDFile1)), (void *)(TSDB_FILE_INFO(pDFile2)), sizeof(SDFInfo)) != 0) {
return false;
} }
}
return true;
}
if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) { static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) {
bool toSend = false;
if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) {
// TODO // TODO
goto _err; return -1;
} }
// No file any more, no need to send file, just return
if (pSet == NULL) { if (pSet == NULL) {
// No need to wait
return 0; return 0;
} }
bool shouldSend = false; if (tsdbRecvDecision(pSynch, &toSend) < 0) {
{ return -1;
// TODO: Recv command to know if need to send file
} }
if (shouldSend) { if (toSend) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype);
if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) { if (tsdbOpenDFile(&df, O_RDONLY) < 0) {
// TODO return -1;
goto _err;
} }
if (taosSendFile(socketFd, TSDB_FILE_FD(pDFile), 0, pDFile->info.size) < pDFile->info.size) { if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&df), 0, df.info.size) < df.info.size) {
// TODO tsdbCloseDFile(&df);
tsdbCloseDFile(pDFile); return -1;
goto _err;
} }
tsdbCloseDFile(pDFile); tsdbCloseDFile(&df);
} }
} }
taosTZfree(pBuf);
return 0; return 0;
}
_err: static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) {
taosTZfree(pBuf); uint32_t tlen = 0;
if (pSet) {
tlen = tsdbEncodeDFileSet(NULL, pSet) + sizeof(TSCKSUM);
}
if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen + sizeof(tlen)) < 0) {
return -1; return -1;
} }
static UNUSED_FUNC int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd) { void *ptr = SYNC_BUFFER(pSynch);
taosEncodeFixedU32(&ptr, tlen);
void *tptr = ptr;
if (pSet) {
tsdbEncodeDFileSet(&ptr, pSet);
taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen);
}
if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(tlen)) < tlen + sizeof(tlen)) {
// TODO // TODO
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0; return 0;
} }
static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { static int tsdbRecvDFileSetInfo(SSyncH *pSynch) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { uint32_t tlen;
if (memcmp(TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet1, ftype)), TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet2, ftype)), char buf[64] = "\0";
sizeof(SDFInfo)) != 0) {
return false; if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
taosDecodeFixedU32(buf, &tlen);
if (tlen == 0) {
pSynch->pdf = NULL;
return 0;
} }
if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) {
return -1;
} }
return true; if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) {
terrno = TSDB_CODE_TDB_MESSED_MSG;
return -1;
}
pSynch->pdf = &(pSynch->df);
tsdbDecodeDFileSet(SYNC_BUFFER(pSynch), pSynch->pdf);
return 0;
} }
\ No newline at end of file
...@@ -12,7 +12,7 @@ static double getCurTime() { ...@@ -12,7 +12,7 @@ static double getCurTime() {
} }
typedef struct { typedef struct {
TSDB_REPO_T *pRepo; STsdbRepo *pRepo;
bool isAscend; bool isAscend;
int tid; int tid;
uint64_t uid; uint64_t uid;
...@@ -143,7 +143,7 @@ TEST(TsdbTest, testInsertSpeed) { ...@@ -143,7 +143,7 @@ TEST(TsdbTest, testInsertSpeed) {
// Create and open repository // Create and open repository
tsdbSetCfg(&tsdbCfg, 1, 16, 4, -1, -1, -1, -1, -1, -1, -1); tsdbSetCfg(&tsdbCfg, 1, 16, 4, -1, -1, -1, -1, -1, -1, -1);
tsdbCreateRepo(rootDir, &tsdbCfg); tsdbCreateRepo(rootDir, &tsdbCfg);
TSDB_REPO_T *repo = tsdbOpenRepo(rootDir, NULL); STsdbRepo *repo = tsdbOpenRepo(rootDir, NULL);
ASSERT_NE(repo, nullptr); ASSERT_NE(repo, nullptr);
// Create table // Create table
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册