From 7d006b6f8eedd19a7d32189ff8df503cfa9a704d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 19 Jan 2021 06:38:20 +0000 Subject: [PATCH] first draft of sync --- src/inc/tsdb.h | 78 +++-- src/tsdb/inc/tsdbCommit.h | 21 ++ src/tsdb/inc/tsdbint.h | 10 +- src/tsdb/src/tsdbCommit.c | 50 +-- src/tsdb/src/tsdbMain.c | 20 +- src/tsdb/src/tsdbMemTable.c | 8 +- src/tsdb/src/tsdbMeta.c | 6 +- src/tsdb/src/tsdbRead.c | 16 +- src/tsdb/src/tsdbSync.c | 662 ++++++++++++++++++++++------------- src/tsdb/tests/tsdbTests.cpp | 4 +- 10 files changed, 530 insertions(+), 345 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index aa4c960279..23d2fbc78c 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -48,7 +48,7 @@ typedef struct { void *cqH; int (*notifyStatus)(void *, int status, int eno); int (*eventCallBack)(void *); - void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema); + void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema); void (*cqDropFunc)(void *handle); } STsdbAppH; @@ -76,17 +76,17 @@ typedef struct { int64_t pointsWritten; // total data points written } STsdbStat; -typedef void TSDB_REPO_T; // use void to hide implementation details from outside +typedef struct STsdbRepo STsdbRepo; -STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo); +STsdbCfg *tsdbGetCfg(const STsdbRepo *repo); // --------- TSDB REPOSITORY DEFINITION -int32_t tsdbCreateRepo(int repoid); -int32_t tsdbDropRepo(int repoid); -TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); -int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); -int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg); -int tsdbGetState(TSDB_REPO_T *repo); +int32_t tsdbCreateRepo(int repoid); +int32_t tsdbDropRepo(int repoid); +STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); +int tsdbCloseRepo(STsdbRepo *repo, int toCommit); +int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg); +int tsdbGetState(STsdbRepo *repo); // --------- TSDB TABLE DEFINITION typedef struct { @@ -110,18 +110,18 @@ typedef struct { void tsdbClearTableCfg(STableCfg *config); -void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes); -char* tsdbGetTableName(void *pTable); +void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes); +char *tsdbGetTableName(void *pTable); -#define TSDB_TABLEID(_table) ((STableId*) (_table)) +#define TSDB_TABLEID(_table) ((STableId *)(_table)) STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); -int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); -int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); -int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); +int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg); +int tsdbDropTable(STsdbRepo *pRepo, STableId tableId); +int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg); -uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); +uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); // the TSDB repository info typedef struct STsdbRepoInfo { @@ -131,7 +131,7 @@ typedef struct STsdbRepoInfo { int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository // TODO: Other informations to add } STsdbRepoInfo; -STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo); +STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo); // the meter information report structure typedef struct { @@ -140,7 +140,7 @@ typedef struct { int64_t tableTotalDataSize; // In bytes int64_t tableTotalDiskSize; // In bytes } STableInfo; -STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid); +STableInfo *tsdbGetTableInfo(STsdbRepo *pRepo, STableId tid); // -- FOR INSERT DATA /** @@ -150,7 +150,7 @@ STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid); * * @return the number of points inserted, -1 for failure and the error number is set */ -int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); +int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); // -- FOR QUERY TIME SERIES DATA @@ -165,9 +165,9 @@ typedef struct STsdbQueryCond { } STsdbQueryCond; typedef struct SMemRef { - int32_t ref; - void *mem; - void *imem; + int32_t ref; + void * mem; + void * imem; } SMemRef; typedef struct SDataBlockInfo { @@ -179,14 +179,14 @@ typedef struct SDataBlockInfo { } SDataBlockInfo; typedef struct { - void *pTable; - TSKEY lastKey; + void *pTable; + TSKEY lastKey; } STableKeyInfo; typedef struct { size_t numOfTables; - SArray *pGroupList; - SHashObj *map; // speedup acquire the tableQueryInfo by table uid + SArray * pGroupList; + SHashObj *map; // speedup acquire the tableQueryInfo by table uid } STableGroupInfo; /** @@ -199,7 +199,8 @@ typedef struct { * @param qinfo query info handle from query processor * @return */ -TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo, SMemRef* pRef); +TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo, + SMemRef *pRef); /** * Get the last row of the given query time window for all the tables in STableGroupInfo object. @@ -211,14 +212,15 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab * @param tableInfo table list. * @return */ -TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo, SMemRef* pRef); +TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo, + SMemRef *pRef); /** * get the queried table object list * @param pHandle * @return */ -SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); +SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); /** * get the group list according to table id from client @@ -228,8 +230,8 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); * @param qinfo * @return */ -TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, - void *qinfo, SMemRef* pRef); +TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, + void *qinfo, SMemRef *pRef); /** * move to next block if exists @@ -246,7 +248,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle); * @param pBlockInfo * @return */ -void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo* pBlockInfo); +void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo); /** * @@ -277,7 +279,7 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdL * @param stableid. super table sid * @param pTagCond. tag query condition */ -int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len, +int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len, int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols); @@ -295,7 +297,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); * @param pGroupInfo the generated result * @return */ -int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); +int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); /** * @@ -304,7 +306,7 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY startKey, ST * @param pGroupInfo * @return */ -int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo); +int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); /** * clean up the query handle @@ -323,10 +325,14 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int tsdbInitCommitQueue(); void tsdbDestroyCommitQueue(); -int tsdbSyncCommit(TSDB_REPO_T *repo); +int tsdbSyncCommit(STsdbRepo *repo); void tsdbIncCommitRef(int vgId); void tsdbDecCommitRef(int vgId); +// For TSDB file sync +int tsdbSyncSend(STsdbRepo *pRepo, int socketFd); +int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h index 928ddb353e..969733e598 100644 --- a/src/tsdb/inc/tsdbCommit.h +++ b/src/tsdb/inc/tsdbCommit.h @@ -19,16 +19,37 @@ #ifdef __cplusplus extern "C" { #endif + +typedef struct { + int minFid; + int midFid; + int maxFid; + TSKEY minKey; +} SRtn; + typedef struct { uint64_t uid; int64_t offset; int64_t size; } SKVRecord; +void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbCommitData(STsdbRepo *pRepo); +static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { + if (fid >= pRtn->maxFid) { + return 0; + } else if (fid >= pRtn->midFid) { + return 1; + } else if (fid >= pRtn->minFid) { + return 2; + } else { + return -1; + } +} + #ifdef __cplusplus } #endif diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index fcd5ffa6a7..d0c575a876 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -48,8 +48,6 @@ extern "C" { #endif -typedef struct STsdbRepo STsdbRepo; - // Log #include "tsdbLog.h" // Meta @@ -92,10 +90,10 @@ struct STsdbRepo { #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) -int tsdbLockRepo(STsdbRepo* pRepo); -int tsdbUnlockRepo(STsdbRepo* pRepo); -STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); -int tsdbCheckCommit(STsdbRepo* pRepo); +int tsdbLockRepo(STsdbRepo* pRepo); +int tsdbUnlockRepo(STsdbRepo* pRepo); +STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); +int tsdbCheckCommit(STsdbRepo* pRepo); static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { ASSERT(pRepo != NULL); diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 1bed22bf23..1470d13b90 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -17,13 +17,6 @@ #define TSDB_MAX_SUBBLOCKS 8 #define TSDB_KEY_FID(key, days, precision) ((key) / tsMsPerDay[(precision)] / (days)) -typedef struct { - int minFid; - int midFid; - int maxFid; - TSKEY minKey; -} SRtn; - typedef struct { SRtn rtn; // retention snapshot SFSIter fsIter; // tsdb file iterator @@ -67,7 +60,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith); static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); static void tsdbDestroyCommitH(SCommitH *pCommith); -static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); static int tsdbGetFidLevel(int fid, SRtn *pRtn); static int tsdbNextCommitFid(SCommitH *pCommith); static int tsdbCommitToTable(SCommitH *pCommith, int tid); @@ -203,6 +195,21 @@ void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { return buf; } +void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { + STsdbCfg *pCfg = REPO_CFG(pRepo); + TSKEY minKey, midKey, maxKey, now; + + now = taosGetTimestamp(pCfg->precision); + minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision]; + midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision]; + maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision]; + + pRtn->minKey = minKey; + pRtn->minFid = TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision); + pRtn->midFid = TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision); + pRtn->maxFid = TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision); +} + static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) { char buf[64] = "\0"; void * pBuf = buf; @@ -552,33 +559,6 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { - STsdbCfg *pCfg = REPO_CFG(pRepo); - TSKEY minKey, midKey, maxKey, now; - - now = taosGetTimestamp(pCfg->precision); - minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision]; - midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision]; - maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision]; - - pRtn->minKey = minKey; - pRtn->minFid = TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision); - pRtn->midFid = TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision); - pRtn->maxFid = TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision); -} - -static int tsdbGetFidLevel(int fid, SRtn *pRtn) { - if (fid >= pRtn->maxFid) { - return 0; - } else if (fid >= pRtn->midFid) { - return 1; - } else if (fid >= pRtn->minFid) { - return 2; - } else { - return -1; - } -} - static int tsdbNextCommitFid(SCommitH *pCommith) { STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index cfc84e4656..f0780b207b 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -59,7 +59,7 @@ int32_t tsdbDropRepo(int repoid) { return tfsRmdir(tsdbDir); } -TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { +STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { STsdbRepo *pRepo; STsdbCfg config = *pCfg; @@ -109,14 +109,14 @@ TSDB_REPO_T *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo)); - return (TSDB_REPO_T *)pRepo; + return pRepo; } // Note: all working thread and query thread must stopped when calling this function -int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { +int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { if (repo == NULL) return 0; - STsdbRepo *pRepo = (STsdbRepo *)repo; + STsdbRepo *pRepo = repo; int vgId = REPO_ID(pRepo); terrno = TSDB_CODE_SUCCESS; @@ -144,7 +144,7 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { } } -STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { +STsdbCfg *tsdbGetCfg(const STsdbRepo *repo) { ASSERT(repo != NULL); return &((STsdbRepo *)repo)->config; } @@ -187,11 +187,11 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { return 0; } -STsdbMeta *tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } +STsdbMeta *tsdbGetMeta(STsdbRepo *pRepo) { return pRepo->tsdbMeta; } -STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } +STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; } -int tsdbGetState(TSDB_REPO_T *repo) { return ((STsdbRepo *)repo)->state; } +int tsdbGetState(STsdbRepo *repo) { return repo->state; } void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { ASSERT(repo != NULL); @@ -201,7 +201,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int *compStorage = pRepo->stat.compStorage; } -int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { +int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { // TODO: think about multithread cases return 0; #if 0 @@ -253,7 +253,7 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { #endif } -uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { +uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { // TODO return 0; #if 0 diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 705997cc99..30e1d9148c 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -52,8 +52,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, TSKEY now); -int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { - STsdbRepo * pRepo = (STsdbRepo *)repo; +int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { + STsdbRepo * pRepo = repo; SSubmitMsgIter msgIter = {0}; SSubmitBlk * pBlock = NULL; int32_t affectedrows = 0; @@ -236,8 +236,8 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { return 0; } -int tsdbSyncCommit(TSDB_REPO_T *repo) { - STsdbRepo *pRepo = (STsdbRepo *)repo; +int tsdbSyncCommit(STsdbRepo *repo) { + STsdbRepo *pRepo = repo; tsdbAsyncCommit(pRepo); sem_wait(&(pRepo->readyToCommit)); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index b45f5b9803..9b407dae48 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -45,7 +45,7 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); // ------------------ OUTER FUNCTIONS ------------------ -int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { +int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; STable * super = NULL; @@ -140,7 +140,7 @@ _err: return -1; } -int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { +int tsdbDropTable(STsdbRepo *repo, STableId tableId) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; uint64_t uid = tableId.uid; @@ -293,7 +293,7 @@ static UNUSED_FUNC int32_t colIdCompar(const void* left, const void* right) { return (colId < p2->colId)? -1:1; } -int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { +int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; STSchema * pNewSchema = NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 7553fe86e1..4aaad39482 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -275,7 +275,7 @@ static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) return pNew; } -static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo, SMemRef* pMemRef) { +static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, void* qinfo, SMemRef* pMemRef) { STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); if (pQueryHandle == NULL) { goto out_of_memory; @@ -295,7 +295,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pQueryHandle->locateStart = false; pQueryHandle->pMemRef = pMemRef; - if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) { + if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) { goto out_of_memory; } @@ -354,7 +354,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* return NULL; } -TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo, SMemRef* pRef) { +TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo, SMemRef* pRef) { STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo, pRef); STsdbMeta* pMeta = tsdbGetMeta(tsdb); @@ -372,7 +372,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab return (TsdbQueryHandleT) pQueryHandle; } -TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) { +TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) { pCond->twindow = updateLastrowForEachGroup(groupList); // no qualified table @@ -408,7 +408,7 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { return res; } -TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pRef) { +TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pRef) { STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pRef); if (pQueryHandle != NULL) { pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; @@ -2720,7 +2720,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) return TSDB_CODE_SUCCESS; } -int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, +int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) { if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; @@ -2815,7 +2815,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY skey, co return terrno; } -int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { +int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); @@ -2845,7 +2845,7 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY startKey, ST return terrno; } -int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) { +int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) { if (tsdbRLockRepoMeta(tsdb) < 0) { return terrno; } diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index 4e7dcb23e9..dac0e31e40 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -15,363 +15,543 @@ #include "tsdbint.h" -static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf); -static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd); -static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet); -static int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd); -static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); +// Sync handle +typedef struct { + STsdbRepo *pRepo; + SRtn rtn; + int socketFd; + void * pBuf; + SMFile * pmf; + SMFile mf; + SDFileSet df; + SDFileSet *pdf; +} SSyncH; + +#define SYNC_BUFFER(sh) ((sh)->pBuf) + +static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd); +static void tsdbDestroySyncH(SSyncH *pSyncH); +static int tsdbSyncSendMeta(SSyncH *pSynch); +static int tsdbSyncRecvMeta(SSyncH *pSynch); +static int tsdbSendMetaInfo(SSyncH *pSynch); +static int tsdbRecvMetaInfo(SSyncH *pSynch); +static int tsdbSendDecision(SSyncH *pSynch, bool toSend); +static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend); +static int tsdbSyncSendDFileSetArray(SSyncH *pSynch); +static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch); +static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); +static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet); +static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet); +static int tsdbRecvDFileSetInfo(SSyncH *pSynch); int tsdbSyncSend(STsdbRepo *pRepo, int socketFd) { - STsdbFS * pfs = REPO_FS(pRepo); - SFSIter fsiter; - SDFileSet *pSet; + SSyncH synch = {0}; - // Disable commit while syncing TSDB files - sem_wait(&(pRepo->readyToCommit)); + tsdbInitSyncH(&synch, pRepo, socketFd); + // Disable TSDB commit + sem_post(&(pRepo->readyToCommit)); - // Sync send meta file - if (tsdbSyncSendMeta(pRepo, socketFd, pfs->cstatus->pmf) < 0) { - tsdbError("vgId:%d failed to sync send meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); - sem_post(&(pRepo->readyToCommit)); - return -1; + if (tsdbSyncSendMeta(&synch) < 0) { + tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; } - // Sync send SDFileSet - tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); - - while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) { - if (tsdbSyncSendDFileSet(pRepo, socketFd, pSet) < 0) { - sem_post(&(pRepo->readyToCommit)); - return -1; - } + if (tsdbSyncSendDFileSetArray(&synch) < 0) { + tsdbError("vgId:%d failed to send data file set array since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; } - // Enable commit - sem_post(&(pRepo->readyToCommit)); + // Enable TSDB commit + sem_wait(&(pRepo->readyToCommit)); + tsdbDestroySyncH(&synch); return 0; + +_err: + sem_wait(&(pRepo->readyToCommit)); + tsdbDestroySyncH(&synch); + return -1; } int tsdbSyncRecv(STsdbRepo *pRepo, int socketFd) { - SFSIter fsiter; - SDFileSet *pSet; - SDFileSet dset; - SDFileSet *pRecvSet = &dset; - uint32_t tlen; - char buf[128]; - void * pBuf = NULL; + SSyncH synch; + tsdbInitSyncH(&synch, pRepo, socketFd); tsdbStartFSTxn(pRepo, 0, 0); - // Sync recv meta file from remote - if (tsdbSyncRecvMeta(pRepo, socketFd) < 0) { - // TODO + if (tsdbSyncRecvMeta(&synch) < 0) { + tsdbError("vgId:%d failed to sync recv meta file from remote since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } - // Sync recv SDFileSet - tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); - pSet = tsdbFSIterNext(&fsiter); - - if (taosReadMsg(socketFd, buf, sizeof(uint32_t)) < 0) { - // TODO + if (tsdbSyncRecvDFileSetArray(&synch) < 0) { + tsdbError("vgId:%d failed to sync recv data file set from remote since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } - taosDecodeFixedU32(buf, &tlen); - if (tlen == 0) { - // No more remote files - pRecvSet = NULL; - } else { - // Has remote files - if (tsdbMakeRoom(&pBuf, tlen) < 0) { - // TODO - goto _err; - } - - if (taosReadMsg(socketFd, pBuf, tlen) < tlen) { - // TODO - goto _err; - } - - pRecvSet = &dset; - tsdbDecodeDFileSet(pBuf, pRecvSet); - } - - while (true) { - if (pSet == NULL && pRecvSet == NULL) break; - - if (pSet == NULL) { - // TODO: local not has, copy from remote - // Process the next remote fset(next pRecvSet) - } else { - if (pRecvSet == NULL) { - // Remote not has, just remove this file - pSet = tsdbFSIterNext(&fsiter); - } else { - if (pSet->fid == pRecvSet->fid) { - if (tsdbIsFSetSame(pSet, pRecvSet)) { - tsdbUpdateDFileSet(REPO_FS(pRepo), pSet); - } else { - // Copy from remote - } - pSet = tsdbFSIterNext(&fsiter); - // Process the next remote fset - } else if (pSet->fid < pRecvSet->fid) { - // Remote has not, just remove this file - pSet = tsdbFSIterNext(&fsiter); - } else { - // not has, copy pRecvSet from remote - // Process the next remote fset - } - } + // TODO: need to restart TSDB or reload TSDB here - } - } - tsdbEndFSTxn(pRepo); + tsdbDestroySyncH(&synch); return 0; _err: - taosTZfree(pBuf); tsdbEndFSTxnWithError(REPO_FS(pRepo)); + tsdbDestroySyncH(&synch); return -1; } -static int tsdbSyncSendMeta(STsdbRepo *pRepo, int socketFd, SMFile *pmf) { - void * pBuf = NULL; - uint32_t tlen = 0; - void * ptr; - SMFile mf; - SMFile * pMFile = NULL; +static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, int socketFd) { + pSyncH->pRepo = pRepo; + pSyncH->socketFd = socketFd; + tsdbGetRtnSnap(pRepo, &(pSyncH->rtn)); +} + +static void tsdbDestroySyncH(SSyncH *pSyncH) { taosTZfree(pSyncH->pBuf); } - if (pmf) { - // copy out - mf = *pmf; - pMFile = &mf; +// ============ SYNC META API +static int tsdbSyncSendMeta(SSyncH *pSynch) { + STsdbRepo *pRepo = pSynch->pRepo; + bool toSendMeta = false; + SMFile mf; + + // Send meta info to remote + if (tsdbSendMetaInfo(pSynch) < 0) { + tsdbError("vgId:%d failed to send meta file info since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; } - if (pMFile) { - tlen = tsdbEncodeMFInfo(NULL, TSDB_FILE_INFO(pMFile)) + sizeof(TSCKSUM); + if (pRepo->fs->cstatus->pmf == NULL) { + // No meta file, not need to wait to retrieve meta file + return 0; } - if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) { + if (tsdbRecvDecision(pSynch, &toSendMeta) < 0) { + tsdbError("vgId:%d failed to recv send file decision since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } - ptr = pBuf; - taosEncodeFixedU32(&ptr, tlen); - if (pMFile) { - tsdbEncodeMFInfo(&ptr, TSDB_FILE_INFO(pMFile)); - taosCalcChecksumAppend(0, (uint8_t *)pBuf, POINTER_DISTANCE(ptr, pBuf)); - ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM)); + if (toSendMeta) { + tsdbInitMFileEx(&mf, pRepo->fs->cstatus->pmf); + if (tsdbOpenMFile(&mf, O_RDONLY) < 0) { + tsdbError("vgId:%d failed to open meta file while sync send meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&mf), 0, mf.info.size) < mf.info.size) { + tsdbError("vgId:%d failed to copy meta file to remote since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbCloseMFile(&mf); + return -1; + } + + tsdbCloseMFile(&mf); } - if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) { - tsdbError("vgId:%d failed to sync meta file since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + return 0; +} + +static int tsdbSyncRecvMeta(SSyncH *pSynch) { + STsdbRepo *pRepo = pSynch->pRepo; + SMFile * pLMFile = pRepo->fs->cstatus->pmf; + + // Recv meta info from remote + if (tsdbRecvMetaInfo(pSynch) < 0) { + tsdbError("vgId:%d failed to recv meta info from remote since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; } - if (pMFile == NULL) { - // No meta file, no need to send + // No meta file, do nothing (rm local meta file) + if (pSynch->pmf == NULL) { return 0; } - bool shouldSend = false; - { - // TODO: Recv command to know if need to send file - } + if (pLMFile == NULL || memcmp(&(pSynch->pmf->info), &(pLMFile->info), sizeof(SMFInfo)) != 0) { + // Local has no meta file or has a different meta file, need to copy from remote + if (tsdbSendDecision(pSynch, true) < 0) { + // TODO + return -1; + } - if (shouldSend) { - if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { - tsdbError("vgId:%d failed to open meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; + // Recv from remote + SMFile mf; + SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}; + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + if (tsdbCreateMFile(&mf, false) < 0) { + // TODO + return -1; } - if (taosSendFile(socketFd, TSDB_FILE_FD(pMFile), 0, pMFile->info.size) < pMFile->info.size) { - tsdbError("vgId:%d failed to send meta file since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - tsdbCloseMFile(pMFile); - goto _err; + if (taosCopyFds(pSynch->socketFd, TSDB_FILE_FD(&mf), pSynch->pmf->info.size) < pSynch->pmf->info.size) { + // TODO + tsdbCloseMFile(&mf); + tsdbRemoveMFile(&mf); + return -1; } - tsdbCloseMFile(pMFile); + tsdbCloseMFile(&mf); + tsdbUpdateMFile(REPO_FS(pRepo), &mf); + } else { + if (tsdbSendDecision(pSynch, false) < 0) { + // TODO + return -1; + } } return 0; +} -_err: - taosTZfree(pBuf); - return -1; +static int tsdbSendMetaInfo(SSyncH *pSynch) { + STsdbRepo *pRepo = pSynch->pRepo; + uint32_t tlen = 0; + SMFile * pMFile = pRepo->fs->cstatus->pmf; + + if (pMFile) { + tlen = tlen + tsdbEncodeSMFile(NULL, pMFile) + sizeof(TSCKSUM); + } + + if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) { + return -1; + } + + void *ptr = SYNC_BUFFER(pSynch); + taosEncodeFixedU32(&ptr, tlen); + void *tptr = ptr; + if (pMFile) { + tsdbEncodeSMFile(&ptr, pMFile); + taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen); + } + + if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(uint32_t)) < tlen + sizeof(uint32_t)) { + tsdbError("vgId:%d failed to send sync meta file info to remote since %s", REPO_ID(pRepo), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; } -static int tsdbSyncRecvMeta(STsdbRepo *pRepo, int socketFd) { +static int tsdbRecvMetaInfo(SSyncH *pSynch) { uint32_t tlen; - char buf[128]; - void * pBuf = NULL; - SMFInfo mfInfo; - SMFile * pMFile = pRepo->fs->cstatus->pmf; - SMFile mf; - - if (taosReadMsg(socketFd, (void *)buf, sizeof(int32_t)) < sizeof(int32_t)) { - tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); + char buf[64] = "\0"; + + if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) { + // TODO terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + return -1; } + taosDecodeFixedU32(buf, &tlen); - // Remote not has meta file, just remove meta file (do nothing) if (tlen == 0) { - // TODO: need to notify remote? + pSynch->pmf = NULL; return 0; } - if (tsdbMakeRoom(&pBuf, tlen) < 0) { - tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; + if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) { + return -1; } - if (taosReadMsg(socketFd, pBuf, tlen) < tlen) { - tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); + if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) { + // TODO terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + return -1; } - if (!taosCheckChecksumWhole((uint8_t *)pBuf, tlen)) { - tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); + if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) { + // TODO terrno = TSDB_CODE_TDB_MESSED_MSG; - goto _err; + return -1; } - void *ptr = pBuf; - ptr = tsdbDecodeMFInfo(ptr, &mfInfo); + pSynch->pmf = &(pSynch->mf); + tsdbDecodeSMFile(SYNC_BUFFER(pSynch), pSynch->pmf); - if (pMFile != NULL && memcmp(&(pMFile->info), &mfInfo, sizeof(SMInfo)) == 0) { - // has file and same as remote, just keep the old one - tsdbUpdateMFile(REPO_FS(pRepo), pMFile); - // Notify remote that no need to send meta file - { - // TODO - } - } else { - // Need to copy meta file from remote - SDiskID did = {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); - mf.info = mfInfo; + return 0; +} - // Create new file - if (tsdbCreateMFile(&mf, false) < 0) { - tsdbError("vgId:%d failed to create meta file since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } +static int tsdbSendDecision(SSyncH *pSynch, bool toSend) { + uint8_t decision = toSend; - // Notify remote to send meta file - { - // TODO - } + if (taosWriteMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) { + // TODO + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } - if (taosCopyFds(socketFd, mf.fd, mfInfo.size) < 0) { - tsdbError("vgId:%d failed to sync recv meta file since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - tsdbCloseMFile(&mf); - tsdbRemoveMFile(&mf); - goto _err; - } + return 0; +} - TSDB_FILE_FSYNC(&mf); - tsdbCloseMFile(&mf); - tsdbUpdateMFile(REPO_FS(pRepo), &mf); +static int tsdbRecvDecision(SSyncH *pSynch, bool *toSend) { + uint8_t decision; + + if (taosReadMsg(pSynch->socketFd, (void *)(&decision), sizeof(decision)) < sizeof(decision)) { + // TODO + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } + *toSend = decision; + return 0; +} -_err: - taosTZfree(pBuf); - return -1; +// ========== SYNC DATA FILE SET ARRAY API +static int tsdbSyncSendDFileSetArray(SSyncH *pSynch) { + STsdbRepo *pRepo = pSynch->pRepo; + STsdbFS * pfs = REPO_FS(pRepo); + SFSIter fsiter; + SDFileSet *pSet; + + tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); + + do { + pSet = tsdbFSIterNext(&fsiter); + if (tsdbSyncSendDFileSet(pSynch, pSet) < 0) { + // TODO + return -1; + } + + // No more file set to send, jut break + if (pSet == NULL) { + break; + } + } while (true); + + return 0; } -static int tsdbSyncSendDFileSet(STsdbRepo *pRepo, int socketFd, SDFileSet *pOSet) { - void * pBuf = NULL; - uint32_t tlen = 0; - void * ptr; - SDFileSet dset; - SDFileSet *pSet = NULL; +static int tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { + STsdbRepo *pRepo = pSynch->pRepo; + STsdbFS * pfs = REPO_FS(pRepo); + SFSIter fsiter; + SDFileSet *pLSet; // Local file set - if (pOSet) { - dset = *pOSet; - pSet = &dset; - } + tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); - if (pSet) { - tlen = tsdbEncodeDFileSet(NULL, pSet) + sizeof(TSCKSUM); + pLSet = tsdbFSIterNext(&fsiter); + if (tsdbRecvDFileSetInfo(pSynch) < 0) { + // TODO + return -1; } - if (tsdbMakeRoom(&pBuf, sizeof(tlen) + tlen) < 0) { - return -1; + while (true) { + if (pLSet == NULL && pSynch->pdf == NULL) break; + + if (pLSet && (pSynch->pdf == NULL || pLSet->fid < pSynch->pdf->fid)) { + // remote not has pLSet->fid set, just remove local (do nothing to remote the fset) + pLSet = tsdbFSIterNext(&fsiter); + } else { + if (pLSet && pSynch->pdf && pLSet->fid == pSynch->pdf->fid && tsdbIsTowFSetSame(pLSet, pSynch->pdf)) { + // Just keep local files and notify remote not to send + if (tsdbUpdateDFileSet(pfs, pLSet) < 0) { + // TODO + return -1; + } + + if (tsdbSendDecision(pSynch, false) < 0) { + // TODO + return -1; + } + } else { + // Need to copy from remote + + // Notify remote to send there file here + if (tsdbSendDecision(pSynch, true) < 0) { + // TODO + return -1; + } + + // Create local files and copy from remote + SDiskID did; + SDFileSet fset; + + tfsAllocDisk(tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn)), &(did.level), &(did.id)); + if (did.level == TFS_UNDECIDED_LEVEL) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } + + tsdbInitDFileSet(&fset, did, REPO_ID(pRepo), pSynch->pdf->fid, FS_TXN_VERSION(pfs)); + + // Create new FSET + if (tsdbCreateDFileSet(&fset, false) < 0) { + // TODO + return -1; + } + + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile = TSDB_DFILE_IN_SET(&fset, ftype); // local file + SDFile *pRDFile = TSDB_DFILE_IN_SET(pSynch->pdf, ftype); // remote file + if (taosCopyFds(pSynch->socketFd, pDFile->fd, pRDFile->info.size) < pRDFile->info.size) { + // TODO + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseDFileSet(&fset); + tsdbRemoveDFileSet(&fset); + return -1; + } + // Update new file info + pDFile->info = pRDFile->info; + } + + tsdbCloseDFileSet(&fset); + if (tsdbUpdateDFileSet(pfs, &fset) < 0) { + // TODO + return -1; + } + } + + // Move forward + if (tsdbRecvDFileSetInfo(pSynch) < 0) { + // TODO + return -1; + } + + if (pLSet) { + pLSet = tsdbFSIterNext(&fsiter); + } + } + +#if 0 + if (pLSet == NULL) { + // Copy from remote >>>>>>>>>>> + } else { + if (pSynch->pdf == NULL) { + // Remove local file, just ignore ++++++++++++++ + pLSet = tsdbFSIterNext(&fsiter); + } else { + if (pLSet->fid < pSynch->pdf->fid) { + // Remove local file, just ignore ++++++++++++ + pLSet = tsdbFSIterNext(&fsiter); + } else if (pLSet->fid > pSynch->pdf->fid){ + // Copy from remote >>>>>>>>>>>>>> + if (tsdbRecvDFileSetInfo(pSynch) < 0) { + // TODO + return -1; + } + } else { + if (true/*TODO: is same fset*/) { + // No need to copy --------------------- + } else { + // copy from remote >>>>>>>>>>>>>. + } + } + } + } +#endif } - ptr = pBuf; - taosEncodeFixedU32(&ptr, tlen); - if (pSet) { - tsdbEncodeDFileSet(&ptr, pSet); - taosCalcChecksumAppend(0, (uint8_t *)pBuf, tlen); - ptr = POINTER_SHIFT(ptr, sizeof(TSCKSUM)); + return 0; +} + +static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile1 = TSDB_DFILE_IN_SET(pSet1, ftype); + SDFile *pDFile2 = TSDB_DFILE_IN_SET(pSet2, ftype); + + if (memcmp((void *)(TSDB_FILE_INFO(pDFile1)), (void *)(TSDB_FILE_INFO(pDFile2)), sizeof(SDFInfo)) != 0) { + return false; + } } - if (taosWriteMsg(socketFd, pBuf, POINTER_DISTANCE(ptr, pBuf)) < POINTER_DISTANCE(ptr, pBuf)) { + return true; +} + +static int tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { + bool toSend = false; + + if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) { // TODO - goto _err; + return -1; } + // No file any more, no need to send file, just return if (pSet == NULL) { - // No need to wait return 0; } - bool shouldSend = false; - { - // TODO: Recv command to know if need to send file + if (tsdbRecvDecision(pSynch, &toSend) < 0) { + return -1; } - if (shouldSend) { + if (toSend) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype); + SDFile df = *TSDB_DFILE_IN_SET(pSet, ftype); - if (tsdbOpenDFile(pDFile, O_RDONLY) < 0) { - // TODO - goto _err; + if (tsdbOpenDFile(&df, O_RDONLY) < 0) { + return -1; } - if (taosSendFile(socketFd, TSDB_FILE_FD(pDFile), 0, pDFile->info.size) < pDFile->info.size) { - // TODO - tsdbCloseDFile(pDFile); - goto _err; + if (taosSendFile(pSynch->socketFd, TSDB_FILE_FD(&df), 0, df.info.size) < df.info.size) { + tsdbCloseDFile(&df); + return -1; } - tsdbCloseDFile(pDFile); + tsdbCloseDFile(&df); } } - taosTZfree(pBuf); return 0; - -_err: - taosTZfree(pBuf); - return -1; } -static UNUSED_FUNC int tsdbSyncRecvDFileSet(STsdbRepo *pRepo, int socketFd) { - // TODO +static int tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) { + uint32_t tlen = 0; + + if (pSet) { + tlen = tsdbEncodeDFileSet(NULL, pSet) + sizeof(TSCKSUM); + } + + if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen + sizeof(tlen)) < 0) { + return -1; + } + + void *ptr = SYNC_BUFFER(pSynch); + taosEncodeFixedU32(&ptr, tlen); + void *tptr = ptr; + if (pSet) { + tsdbEncodeDFileSet(&ptr, pSet); + taosCalcChecksumAppend(0, (uint8_t *)tptr, tlen); + } + + if (taosWriteMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen + sizeof(tlen)) < tlen + sizeof(tlen)) { + // TODO + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + return 0; } -static bool tsdbIsFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { - for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - if (memcmp(TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet1, ftype)), TSDB_FILE_INFO(TSDB_DFILE_IN_SET(pSet2, ftype)), - sizeof(SDFInfo)) != 0) { - return false; - } +static int tsdbRecvDFileSetInfo(SSyncH *pSynch) { + uint32_t tlen; + char buf[64] = "\0"; + + if (taosReadMsg(pSynch->socketFd, buf, sizeof(tlen)) < sizeof(tlen)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - return true; + taosDecodeFixedU32(buf, &tlen); + + if (tlen == 0) { + pSynch->pdf = NULL; + return 0; + } + + if (tsdbMakeRoom((void **)(&SYNC_BUFFER(pSynch)), tlen) < 0) { + return -1; + } + + if (taosReadMsg(pSynch->socketFd, SYNC_BUFFER(pSynch), tlen) < tlen) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (!taosCheckChecksumWhole((uint8_t *)SYNC_BUFFER(pSynch), tlen)) { + terrno = TSDB_CODE_TDB_MESSED_MSG; + return -1; + } + + pSynch->pdf = &(pSynch->df); + tsdbDecodeDFileSet(SYNC_BUFFER(pSynch), pSynch->pdf); + + return 0; } \ No newline at end of file diff --git a/src/tsdb/tests/tsdbTests.cpp b/src/tsdb/tests/tsdbTests.cpp index ef5ed6f044..ac254d6c34 100644 --- a/src/tsdb/tests/tsdbTests.cpp +++ b/src/tsdb/tests/tsdbTests.cpp @@ -12,7 +12,7 @@ static double getCurTime() { } typedef struct { - TSDB_REPO_T *pRepo; + STsdbRepo *pRepo; bool isAscend; int tid; uint64_t uid; @@ -143,7 +143,7 @@ TEST(TsdbTest, testInsertSpeed) { // Create and open repository tsdbSetCfg(&tsdbCfg, 1, 16, 4, -1, -1, -1, -1, -1, -1, -1); tsdbCreateRepo(rootDir, &tsdbCfg); - TSDB_REPO_T *repo = tsdbOpenRepo(rootDir, NULL); + STsdbRepo *repo = tsdbOpenRepo(rootDir, NULL); ASSERT_NE(repo, nullptr); // Create table -- GitLab