diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 256b8189f8fc69345b27fdf702fb705d22ac3c10..16f5c93374754fae475a8fbac77fb8ddcc29ffad 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -156,18 +156,12 @@ typedef enum { TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, - TSDB_FILE_TYPE_STAT, - TSDB_FILE_TYPE_NHEAD, - TSDB_FILE_TYPE_NDATA, - TSDB_FILE_TYPE_NLAST, - TSDB_FILE_TYPE_NSTAT + TSDB_FILE_TYPE_MANIFEST, + TSDB_FILE_TYPE_META, + TSDB_FILE_TYPE_CFG } TSDB_FILE_TYPE; -#ifndef TDINTERNAL #define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1) -#else -#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1) -#endif typedef struct { uint32_t magic; @@ -552,8 +546,7 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) { #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) -char* tsdbGetMetaFileName(char* rootDir); -void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname); +int tsdbGetFileName(char* rootDir, int type, int vid, int fid, int seq, char** fname); int tsdbLockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo); char* tsdbGetDataDirName(char* rootDir); @@ -572,6 +565,14 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx); int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); +// -------------------------- ADDED -------------------------- +typedef struct { + STsdbRepo* pRepo; + char fname[TSDB_FILENAME_LEN]; // manifest file name + int fd; + void* pBuffer; + SList* pModLog; +} SCommitHandle; #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c new file mode 100644 index 0000000000000000000000000000000000000000..dfe3d59380b747bb07ab2964aeb69bb6015f8260 --- /dev/null +++ b/src/tsdb/src/tsdbCommit.c @@ -0,0 +1,567 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#include "tsdbMain.h" +#include "tchecksum.h" + +#define TSDB_DATA_FILE_CHANGE 0 +#define TSDB_META_FILE_CHANGE 1 + +typedef struct { + int maxIters; + SCommitIter *pIters; + SRWHelper whelper; + SDataCols * pDataCols; +} STSCommitHandle; + +typedef struct { + int32_t len; + int32_t type; + char change[]; +} STsdbFileChange; + +typedef struct { + char oname[TSDB_FILENAME_LEN]; + char nname[TSDB_FILENAME_LEN]; + SStoreInfo info; +} SMetaFileChange; + +typedef struct { + SFileGroup ofgroup; + SFileGroup nfgroup; +} SDataFileChange; + +int tsdbCommitData(STsdbRepo *pRepo) { + ASSERT(pRepo->commit == 1 && pRepo->imem != NULL); + + SCommitHandle commitHandle = {0}; + SCommitHandle *pCommitH = &commitHandle; + + pCommitH->pRepo = pRepo; + + if (tsdbStartCommit(pCommitH) < 0) return -1; + + if (tsdbCommitTimeSeriesData(pCommitH) < 0) goto _err; + + if (tsdbCommitMetaData(pCommitH) < 0) goto _err; + + if (tsdbApplyRetention(pCommitH) < 0) goto _err; + + tsdbEndCommit(pCommitH, false); + + return 0; + +_err: + tsdbEndCommit(pCommitH, true); + return -1; +} + +static int tsdbStartCommit(SCommitHandle *pCommitH) { + STsdbRepo *pRepo = pCommitH->pRepo; + SMemTable *pMem = pRepo->imem; + STsdbCfg * pCfg = &(pRepo->config); + + tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), + pMem->keyFirst, pMem->keyLast, pMem->numOfRows); + + pCommitH->pModLog = tdListNew(sizeof(void *)); + if (pCommitH->pModLog == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pCommitH->fd = -1; + + tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, pCommitH->fname); + pCommitH->fd = open(pCommitH->fname, O_CREAT | O_WRONLY | O_APPEND, 0755); + if (pCommitH->fd < 0) { + tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + return 0; + +_err: + if (pCommitH->fd >= 0) { + close(pCommitH->fd); + pCommitH->fd = -1; + remove(pCommitH->fname); + } + tdListFree(pCommitH->pModLog); + return -1; +} + +static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) { + // TODO: append commit over flag + if (false /* tsdbLogCommitOver(pCommitH) < 0 */) { + hasError = true; + } + + tsdbInfo("vgId:%d commit over, commit status: %s", REPO_ID(pRepo), hasError ? "FAILED" : "SUCCEED"); + + SListNode *pNode = NULL; + + while ((pNode = tdListPopHead(pCommitH->pModLog)) != NULL) { + STsdbFileChange *pChange = (STsdbFileChange *)(*(void **)pNode->data); + + tsdbApplyFileChange(pChange, !hasError); + free(pNode); + free(pChange); + } + + close(pCommitH->fd); + pCommitH->fd = -1; + remove(pCommitH->fname); + tdListFree(pCommitH->pModLog); + return; +} + +static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { + STsdbRepo * pRepo = pCommitH->pRepo; + SMemTable * pMem = pRepo->imem; + STsdbCfg * pCfg = &(pRepo->config); + STsdbMeta * pMeta = pRepo->tsdbMeta; + STsdbFileH *pFileH = pRepo->tsdbFileH; + + int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); + for (int i = 0; i < pFileH->nFGroups; i++) { + SFileGroup *pFGroup = pFileH->pFGroup[i]; + if (pFGroup->fileId < mfid) { + STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(STsdbFileChange) + sizeof(STsdbFileChange)); + if (pChange == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pChange->type = TSDB_DATA_FILE_CHANGE; + SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change; + pDataFileChange->ofgroup = pFGroup; + } else { + break; + } + } + + if (pMem->numOfRows <= 0) return 0; + + // Initialize resources + STSCommitHandle tsCommitH = {0}; + if (tsdbInitTSCommitHandle(&tsCommitH, pRepo) < 0) return -1; + + // Commit Time-Series data file by file + int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); + int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + + for (int fid = sfid; fid <= efid; fid++) { + TSKEY minKey = 0, maxKey = 0; + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + + if (fid < mfid) { + // TODO: skip data in this file beyond retentioin and continue; + continue; + } + + if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue; + + { + // TODO: manifest log file group action + } + + if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) { + tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _err; + } + } + + tsdbDestroyTSCommitHandle(&tsCommitH); + return 0; + +_err: + tsdbDestroyTSCommitHandle(&tsCommitH); + return -1; +} + +// Function to commit meta data +static int tsdbCommitMetaData(SCommitHandle *pCommitH) { + STsdbRepo *pRepo = pCommitH->pRepo; + SKVStore * pStore = pRepo->tsdbMeta->pStore; + SMemTable *pMem = pRepo->imem; + SActObj * pAct = NULL; + SActCont * pCont = NULL; + + if (listNEles(pMem->actList) <= 0) return 0; + + // Log meta file change + if (tsdbLogMetaFileChange(pCommitH) < 0) return -1; + + // Commit data + if (tdKVStoreStartCommit(pStore) < 0) { + tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + SListNode *pNode = NULL; + + while ((pNode = tdListPopHead(pMem->actList)) != NULL) { + pAct = (SActObj *)pNode->data; + if (pAct->act == TSDB_UPDATE_META) { + pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); + if (tdUpdateKVStoreRecord(pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { + tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); + tdKVStoreEndCommit(pStore, true /*hasErro*/); + return -1; + } + } else if (pAct->act == TSDB_DROP_META) { + if (tdDropKVStoreRecord(pStore, pAct->uid) < 0) { + tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, + tstrerror(terrno)); + tdKVStoreEndCommit(pStore, true /*hasErro*/); + return -1; + } + } else { + ASSERT(false); + } + } + + if (tdKVStoreEndCommit(pMeta->pStore, false /*hasError = false*/) < 0) { + tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + return 0; +} + +static int tsdbApplyRetention(SCommitHandle *pCommitH) { + // TODO + return 0; +} + +static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { + SMemTable *pMem = pRepo->imem; + STsdbMeta *pMeta = pRepo->tsdbMeta; + + SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); + if (iters == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; + + // reference all tables + for (int i = 0; i < pMem->maxTables; i++) { + if (pMeta->tables[i] != NULL) { + tsdbRefTable(pMeta->tables[i]); + iters[i].pTable = pMeta->tables[i]; + } + } + + if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; + + for (int i = 0; i < pMem->maxTables; i++) { + if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { + if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + tSkipListIterNext(iters[i].pIter); + } + } + + return iters; + +_err: + tsdbDestroyCommitIters(iters, pMem->maxTables); + return NULL; +} + +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { + if (iters == NULL) return; + + for (int i = 1; i < maxTables; i++) { + if (iters[i].pTable != NULL) { + tsdbUnRefTable(iters[i].pTable); + tSkipListDestroyIter(iters[i].pIter); + } + } + + free(iters); +} + +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) { + char * dataDir = NULL; + STsdbCfg * pCfg = &pRepo->config; + STsdbFileH * pFileH = pRepo->tsdbFileH; + SFileGroup * pGroup = NULL; + SMemTable * pMem = pRepo->imem; + bool newLast = false; + SCommitIter *iters = pTSCh->pIters; + SRWHelper * pHelper = &(pTSCh->whelper); + SDataCols * pDataCols = pTSCh->pDataCols; + + // Create and open files for commit + dataDir = tsdbGetDataDirName(pRepo->rootDir); + if (dataDir == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { + tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _err; + } + + // Open files for write/read + if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { + tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + newLast = TSDB_NLAST_FILE_OPENED(pHelper); + + if (tsdbLoadCompIdx(pHelper, NULL) < 0) { + tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + // Loop to commit data in each table + for (int tid = 1; tid < pMem->maxTables; tid++) { + SCommitIter *pIter = iters + tid; + if (pIter->pTable == NULL) continue; + + taosRLockLatch(&(pIter->pTable->latch)); + + if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; + + if (pIter->pIter != NULL) { + if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), + tstrerror(terrno)); + goto _err; + } + } + + taosRUnLockLatch(&(pIter->pTable->latch)); + + // Move the last block to the new .l file if neccessary + if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { + tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + + // Write the SCompBlock part + if (tsdbWriteCompInfo(pHelper) < 0) { + tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _err; + } + } + + if (tsdbWriteCompIdx(pHelper) < 0) { + tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _err; + } + + taosTFree(dataDir); + tsdbCloseHelperFile(pHelper, 0, pGroup); + + pthread_rwlock_wrlock(&(pFileH->fhlock)); + + (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); + pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; + + if (newLast) { + (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); + pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; + } else { + pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; + } + + pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; + + pthread_rwlock_unlock(&(pFileH->fhlock)); + + return 0; + +_err: + taosTFree(dataDir); + tsdbCloseHelperFile(pHelper, 1, NULL); + return -1; +} + +static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { + for (int i = 0; i < nIters; i++) { + TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); + if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; + } + return 0; +} + +static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { + STsdbCfg * pCfg = &(pRepo->config); + STsdbMeta *pMeta = pRepo->tsdbMeta; + SMemTable *pMem = pRepo->imem; + + pTSCh->pIters = tsdbCreateCommitIters(pRepo); + if (pTSCh->pIters == NULL) { + tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbDestroyTSCommitHandle(pTSCh); + return -1; + } + pTSCh->maxIters = pMem->maxTables; + + if (tsdbInitWriteHelper(&(pTSCh->whelper), pRepo) < 0) { + tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbDestroyTSCommitHandle(pTSCh); + return -1; + } + + pTSCh->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); + if (pTSCh->pDataCols == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", + REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + tsdbDestroyTSCommitHandle(pTSCh); + return -1; + } + + return 0; +} + +static void tsdbDestroyTSCommitHandle(STSCommitHandle *pTSCh) { + if (pTSCh) { + tdFreeDataCols(pTSCh->pDataCols); + tsdbDestroyHelper(&(pTSCh->whelper)); + tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters); + } +} + +static int tsdbLogFileChange(SCommitHandle *pCommitH, STsdbFileChange *pChange) { + STsdbRepo *pRepo = pCommitH->pRepo; + + pChange->len = tsdbEncodeFileChange(NULL, pChange) + sizeof(TSCKSUM); + + if ((pCommitH->pBuffer = taosTRealloc(pCommitH->pBuffer, pChange->len)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (taosTWrite(pCommitH->fd, (void *)pChange, sizeof(*pChange)) < sizeof(*pChange)) { + tsdbError("vgId:%d failed to write file change to file %s since %s", REPO_ID(pRepo), pCommitH->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + int tsize = tsdbEncodeFileChange(pCommitH->pBuffer, pChange); + ASSERT(tsize + sizeof(TSCKSUM) == pChange->len); + + taosCalcChecksumAppend(0, pCommitH->pBuffer, pChange->len); + + if (taosTWrite(pCommitH->fd, pCommitH->pBuffer, pChange->len) < pChange->len) { + tsdbError("vgId:%d failed to write file change encode to file %s, bytes %d since %s", REPO_ID(pRepo), + pCommitH->fname, pChange->len, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (fsync(pCommitH->fd) < 0) { + tsdbError("vgId:%d failed to fsync file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + +static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) { + int tsize = 0; + if (pChange->type == TSDB_META_FILE_CHANGE) { + SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change; + tsize += taosEncodeString(buf, pMetaChange->oname); + tsize += taosEncodeString(buf, pMetaChange->nname); + tsize += tdEncodeStoreInfo(buf, pMetaChange->info); + } else if (pChange->type == TSDB_DATA_FILE_CHANGE) { + SDataFileChange *pDataChange = (SDataFileChange *)pChange->change; + // TODO + } else { + ASSERT(false); + } + + return tsize; +} + +static void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) { + // TODO + return buf; +} + +static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) { + STsdbRepo *pRepo = pCommitH->pRepo; + SKVStore * pStore = pRepo->tsdbMeta->pStore; + + STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(*pChange) + sizeof(SMetaFileChange)); + if (pChange == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pChange->type = TSDB_META_FILE_CHANGE; + SMetaFileChange *pMetaChange = (SMetaFileChange *)(pChange->change); + strncpy(pMetaChange->oname, pStore->fname, TSDB_FILENAME_LEN); + strncpy(pMetaChange->nname, pStore->fname, TSDB_FILENAME_LEN); + pMetaChange->info = pStore->info; + if (tsdbLogFileChange(pCommitH, pChange) < 0) { + free(pChange); + return -1; + } + tdListPrepend(pCommitH->pModLog, &pChange); + + return 0; +} + +static int + +static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) { + if (pChange->type == TSDB_META_FILE_CHANGE) { + SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change; + + if (isCommitEnd) { + if (strncmp(pMetaChange->oname, pMetaChange->nname) != 0) { + (void)remove(pMetaChange->oname); + } + } else { // roll back + // TODO + } + } else if (pChange->len == TSDB_DATA_FILE_CHANGE) { + + } else { + ASSERT(0); + } + + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 626ad77da2eab4be9e94516c4e5c7c0e5a45837e..b11f23bb02defaf0ec17a62dc371be29c2b543eb 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -23,9 +23,6 @@ #include "tsdbMain.h" #include "tutil.h" - -const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"}; - static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static void tsdbDestroyFile(SFile *pFile); static int compFGroup(const void *arg1, const void *arg2); @@ -128,7 +125,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { if (fid < mfid) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbGetDataFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname); + tsdbGetFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname); (void)remove(fname); } continue; @@ -345,7 +342,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { memset((void *)pFile, 0, sizeof(SFile)); pFile->fd = -1; - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); + tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); if (access(pFile->fname, F_OK) == 0) { tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname); @@ -525,7 +522,7 @@ _err: static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { uint32_t version; - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); + tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname); pFile->fd = -1; if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a1e6376304a54ea810f3662bd30e3f97fd53765c..e205c9959e2934015c860e3ad5173ff63b24acd1 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -23,9 +23,6 @@ #include "tsdb.h" #include "tulog.h" -#define TSDB_CFG_FILE_NAME "config" -#define TSDB_DATA_DIR_NAME "data" -#define TSDB_META_FILE_NAME "meta" #define TSDB_META_FILE_INDEX 10000000 #define IS_VALID_PRECISION(precision) \ (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) @@ -49,7 +46,6 @@ static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg); static int32_t tsdbUnsetRepoEnv(char *rootDir); static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg); static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg); -static char * tsdbGetCfgFname(char *rootDir); static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); static void tsdbFreeRepo(STsdbRepo *pRepo); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); @@ -233,7 +229,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) { if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { - fname = tsdbGetMetaFileName(pRepo->rootDir); + tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname); *index = TSDB_META_FILE_INDEX; magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta); } else { @@ -345,22 +341,6 @@ int tsdbGetState(TSDB_REPO_T *repo) { } // ----------------- INTERNAL FUNCTIONS ----------------- -char *tsdbGetMetaFileName(char *rootDir) { - int tlen = (int)(strlen(rootDir) + strlen(TSDB_META_FILE_NAME) + 2); - char *fname = calloc(1, tlen); - if (fname == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - snprintf(fname, tlen, "%s/%s", rootDir, TSDB_META_FILE_NAME); - return fname; -} - -void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, char *fname) { - snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]); -} - int tsdbLockRepo(STsdbRepo *pRepo) { int code = pthread_mutex_lock(&pRepo->mutex); if (code != 0) { @@ -557,15 +537,11 @@ static int32_t tsdbUnsetRepoEnv(char *rootDir) { static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { int fd = -1; - char *fname = NULL; + char fname[TSDB_FILENAME_LEN] = "\0"; char buf[TSDB_FILE_HEAD_SIZE] = "\0"; char *pBuf = buf; - fname = tsdbGetCfgFname(rootDir); - if (fname == NULL) { - tsdbError("vgId:%d failed to save configuration since %s", pCfg->tsdbId, tstrerror(terrno)); - goto _err; - } + tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname); fd = open(fname, O_WRONLY | O_CREAT, 0755); if (fd < 0) { @@ -592,26 +568,20 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { goto _err; } - free(fname); close(fd); return 0; _err: - taosTFree(fname); if (fd >= 0) close(fd); return -1; } static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) { - char *fname = NULL; - int fd = -1; - char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + char fname[TSDB_FILENAME_LEN] = "\0"; + int fd = -1; + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; - fname = tsdbGetCfgFname(rootDir); - if (fname == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } + tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname); fd = open(fname, O_RDONLY); if (fd < 0) { @@ -634,29 +604,14 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) { tsdbDecodeCfg(buf, pCfg); - taosTFree(fname); close(fd); - return 0; _err: - taosTFree(fname); if (fd >= 0) close(fd); return -1; } -static char *tsdbGetCfgFname(char *rootDir) { - int tlen = (int)(strlen(rootDir) + strlen(TSDB_CFG_FILE_NAME) + 2); - char *fname = calloc(1, tlen); - if (fname == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - snprintf(fname, tlen, "%s/%s", rootDir, TSDB_CFG_FILE_NAME); - return fname; -} - static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo)); if (pRepo == NULL) { diff --git a/src/tsdb/src/tsdbManifest.c b/src/tsdb/src/tsdbManifest.c new file mode 100644 index 0000000000000000000000000000000000000000..17527cdee89ecc84e118b4a43dc08a3c33bedc87 --- /dev/null +++ b/src/tsdb/src/tsdbManifest.c @@ -0,0 +1,238 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include +#include + +#include "tchecksum.h" +#include "tsdbMain.h" + +#define TSDB_MANIFEST_FILE_VERSION 0 +#define TSDB_MANIFEST_FILE_HEADER_SIZE 128 +#define TSDB_MANIFEST_END "C0D09F476DEF4A32B694A6A9E7B7B240" +#define TSDB_MANIFEST_END_SIZE 32 + +#define TSDB_MANIFEST_END_RECORD 0 +#define TSDB_MANIFEST_META_RECORD 1 +#define TSDB_MANIFEST_DATA_RECORD 2 + +typedef struct { + int type; + int len; +} SManifestRecord; + +int tsdbInitManifestHandle(STsdbRepo *pRepo, SManifestHandle *pManifest) { + STsdbCfg *pCfg = &(pRepo->config); + + pManifest->pBuffer = NULL; + pManifest->contSize = 0; + + tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pManifest->fname)); + pManifest->fd = open(pManifest->fname, O_CREAT | O_APPEND, 0755); + if (pManifest->fd < 0) { + tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (tsdbWriteManifestHeader(pRepo, pManifest) < 0) { + tsdbCloseManifestHandle(pRepo, pManifest); + return -1; + } + + return 0; +} + +void tsdbCloseManifestHandle(SManifestHandle *pManifest) { + if (pManifest != NULL && pManifest->fd > 0) { + close(pManifest->fd); + pManifest->fd = -1; + } + + remove(pManifest->fname); + taosTZfree(pManifest->pBuffer); + pManifest->pBuffer = NULL; + pManifest->contSize = 0; + return 0; +} + +int tsdbAppendManifestRecord(SManifestHandle *pManifest, STsdbRepo *pRepo, int type) { + ASSERT(pManifest->pBuffer != NULL && taosTSizeof(pManifest->pBuffer) >= pManifest->contSize); + + if (pManifest->contSize > 0) { + if (tsdbManifestMakeMoreRoom(pManifest, sizeof(TSCKSUM)) < 0) return -1; + pManifest->contSize += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)pManifest->pBuffer, pManifest->contSize); + } + + SManifestRecord mRecord = {.type = type, .len = pManifest->contSize}; + + // Write mRecord part + if (taosTWrite(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < sizeof(mRecord)) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), sizeof(mRecord), pManifest->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + // Write buffer part + if (pManifest->contSize > 0 && taosTWrite(pManifest->fd, pManifest->pBuffer, pManifest->contSize) < pManifest->contSize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), pManifest->contSize, + pManifest->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (fsync(pManifest->fd) < 0) { + tsdbError("vgId:%d failed to fsync file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} + +int tsdbAppendManifestEnd(SManifestHandle *pManifest, STsdbRepo *pRepo) { + pManifest->contSize = 0; + return tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_END_RECORD); +} + +int tsdbManifestMakeRoom(SManifestHandle *pManifest, int expectedSize) { + pManifest->pBuffer = taosTRealloc(pManifest->pBuffer, expectedSize); + if (pManifest->pBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +int tsdbManifestMakeMoreRoom(SManifestHandle *pManifest, int moreSize) { + return tsdbManifestMakeRoom(pManifest, pManifest->contSize + moreSize); +} + +// TODO +bool tsdbIsManifestEnd(SManifestHandle *pManifest) { + SManifestRecord mRecord; + + if (lseek(pManifest->fd, sizeof(mRecord), SEEK_END) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return false; + } + + if (taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < 0) { + tsdbError("vgId:%d failed to read manifest end from file %s since %s", REPO_ID(pRepo), pManifest->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return false; + } + + return (mRecord.type == TSDB_MANII) +} + +int tsdbManifestRollBackOrForward(SManifestHandle *pManifest, bool isManifestEnd, STsdbRepo *pRepo) { + SManifestRecord mRecord; + + if (lseek(pManifest->fd, TSDB_MANIFEST_FILE_HEADER_SIZE, SEEK_SET) < 0) { + tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return; + } + + while (true) { + ssize_t size = 0; + + size = taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)); + if (size < 0) { + tsdbError("vgId:%d failed to read SManifestRecord part from file %s since %s", REPO_ID(pRepo), pManifest->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (size < sizeof(mRecord)) break; + if ((mRecord.type != TSDB_MANIFEST_DATA_RECORD && mRecord.type != TSDB_MANIFEST_META_RECORD && mRecord.type != TSDB_MANIFEST_END_RECORD) || mRecord.len < 0) { + tsdbError("vgId:%d manifest file %s is broken since invalid mRecord content", REPO_ID(pRepo), pManifest->fname); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + if (mRecord.type == TSDB_MANIFEST_END_RECORD) { + ASSERT(isManifestEnd && mRecord.len == 0); + break; + } + + if (tsdbManifestMakeRoom(pManifest, mRecord.len) < 0) return -1; + + size = taosTRead(pManifest->fd, pManifest->pBuffer, mRecord.len); + if (size < 0) { + tsdbError("vgId:%d failed to read SManifestRecord content from file %s since %s", REPO_ID(pRepo), pManifest->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (size < mRecord.len) break; + + if (!taosCheckChecksumWhole((uint8_t *)pManifest->pBuffer, size)) { + tsdbError("vgId:%d manifest file %s is broken since checksum error", REPO_ID(pRepo), pManifest->fname); + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return -1; + } + + if (mRecord.type == TSDB_MANIFEST_DATA_RECORD) { + // func1(pManifest->pBuffer, mRecord.len, isManifestEnd); + } else if (mRecord.type == TSDB_MANIFEST_META_RECORD) { + // func2(pManifest->pBuffer, mRecord.len, isManifestEnd); + } else { + ASSERT(0); + } + } + + return 0; + +} + +int tsdbEncodeManifestRecord(SManifestHandle *pManifest) { + pManifest->contSize = 0; + +} + +static int tsdbEncodeManifestHeader(void **buffer) { + int len = taosEncodeFixedU32(buf, TSDB_MANIFEST_FILE_VERSION); + return len; +} + +static void *tsdbDecodeManifestHeader(void *buffer, uint32_t version) { + buffer = taosDecodeFixedU32(buffer, &version); + return buffer; +} + +static int tsdbWriteManifestHeader(STsdbRepo *pRepo, SManifestHandle *pManifest) { + char buffer[TSDB_MANIFEST_FILE_HEADER_SIZE] = "\0"; + tsdbEncodeManifestHeader(&buffer); + + taosCalcChecksumAppend(0, (uint8_t)buffer, TSDB_MANIFEST_FILE_HEADER_SIZE); + if (taosTWrite(pManifest->fd, buffer, TSDB_MANIFEST_FILE_HEADER_SIZE) < 0) { + tsdbError("vgId:%d failed to write file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 4cf8ddd4bd8df396352ad66b8499552018d5d322..8cd3ec6dab9cb6efb806531f66d37a5168b8b722 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -464,19 +464,21 @@ static void tsdbFreeTableData(STableData *pTableData) { static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } static void *tsdbCommitData(void *arg) { - STsdbRepo * pRepo = (STsdbRepo *)arg; - SMemTable * pMem = pRepo->imem; - STsdbCfg * pCfg = &pRepo->config; - SDataCols * pDataCols = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; - SCommitIter *iters = NULL; - SRWHelper whelper = {0}; - ASSERT(pRepo->commit == 1); - ASSERT(pMem != NULL); + STsdbRepo * pRepo = (STsdbRepo *)arg; + SMemTable * pMem = pRepo->imem; + STsdbCfg * pCfg = &pRepo->config; + STsdbMeta * pMeta = pRepo->tsdbMeta; + SCommitHandle commitHandle = {0}; + SCommitHandle *pCommitH = &commitHandle; + + ASSERT(pRepo->commit == 1 && pMem != NULL); tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), pMem->keyFirst, pMem->keyLast, pMem->numOfRows); + pCommitH->pRepo = pRepo; + if (tsdbInitManifestHandle(pRepo, &(pCommitH->manifest)) < 0) goto _exit; + // Create the iterator to read from cache if (pMem->numOfRows > 0) { iters = tsdbCreateCommitIters(pRepo); @@ -485,7 +487,7 @@ static void *tsdbCommitData(void *arg) { goto _exit; } - if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { + if (tsdbInitWriteHelper(&(pCommitH->whelper), pRepo) < 0) { tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _exit; } @@ -502,7 +504,7 @@ static void *tsdbCommitData(void *arg) { // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + if (tsdbCommitToFile(pRepo, fid, iters, &(pCommitH->whelper), pDataCols) < 0) { tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _exit; } @@ -517,23 +519,34 @@ static void *tsdbCommitData(void *arg) { tsdbFitRetention(pRepo); + if (tsdbAppendManifestEnd(&pCommitH->manifest, pCommitH->pRepo) < 0) { + // TODO + } + + tsdbApplyManifestAction(&pCommitH->manifest); + _exit: tdFreeDataCols(pDataCols); tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbDestroyHelper(&whelper); + tsdbCloseManifestHandle(&(pCommitH->manifest)); + tsdbDestroyHelper(&(pCommitH->whelper)); tsdbEndCommit(pRepo); tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); return NULL; } -static int tsdbCommitMeta(STsdbRepo *pRepo) { +static int tsdbCommitMeta(STsdbRepo *pRepo, SManifestHandle *pManifest) { SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; SActObj * pAct = NULL; SActCont * pCont = NULL; if (listNEles(pMem->actList) > 0) { + pManifest->contSize = tdEncodeCommitAction(pMeta->pStore, &(pManifest->pBuffer)); + + if (tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_META_RECORD) < 0) goto _err; + if (tdKVStoreStartCommit(pMeta->pStore) < 0) { tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index cf0577717106a99d17abd521a01de787a57265d4..e52f5819348d9cf921bd7268f96ccac5d853f6bd 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -472,11 +472,7 @@ int tsdbOpenMeta(STsdbRepo *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; ASSERT(pMeta != NULL); - fname = tsdbGetMetaFileName(pRepo->rootDir); - if (fname == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } + if (tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname) < 0) goto _err; pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo); if (pMeta->pStore == NULL) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 357093bd9e125567ef3d3629c00877f54778b500..74b8df94ba1fb9ddceb4d5a7486ae90d22d5c5c7 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -113,9 +113,9 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { // Set the files pHelper->files.fGroup = *pGroup; if (helperType(pHelper) == TSDB_WRITE_HELPER) { - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, + tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname); - tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST, + tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname); } diff --git a/src/tsdb/src/tsdbUtil.c b/src/tsdb/src/tsdbUtil.c new file mode 100644 index 0000000000000000000000000000000000000000..18d814e77ad32bf6a3bd2bb651c02b3ba50a7399 --- /dev/null +++ b/src/tsdb/src/tsdbUtil.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "libgen.h" +#include "stdio.h" + +#include "tsdbMain.h" + +#define TSDB_DATA_DIR_NAME "data" + +const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".manifest", "meta", "config"}; + +int tsdbGetFileName(char *rootDir, int type, int vid, int fid, int seq, char **fname) { + if (*fname == NULL) { + *fname = (char *)malloc(TSDB_FILENAME_LEN); + if (*fname == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + + switch (type) { + case TSDB_FILE_TYPE_HEAD: + case TSDB_FILE_TYPE_DATA: + case TSDB_FILE_TYPE_LAST: + if (seq == 0) { // For backward compatibility + snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, + tsdbFileSuffix[type]); + } else { + snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s-%d", rootDir, TSDB_DATA_DIR_NAME, vid, fid, + tsdbFileSuffix[type], seq); + } + break; + case TSDB_FILE_TYPE_MANIFEST: + snprintf(*fname, TSDB_FILENAME_LEN, "%s/v%d%s", rootDir, vid, tsdbFileSuffix[type]); + break; + case TSDB_FILE_TYPE_META: + case TSDB_FILE_TYPE_CFG: + snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s", rootDir, tsdbFileSuffix[type]); + break; + default: + ASSERT(0); + break; + } + + return 0; +} + +int tsdbParseFileName(char *fname, int *type, int *vid, int *fid, int *seq) { + // TODO + return 0; +} + +int tsdbGetNextSeqNum(int currentNum) { + if (currentNum == 0) { + return 1; + } else { + return 0; + } +} + +// ========================= TEST ========================= +#include +#include + +int main(int argc, char const *argv[]) +{ + char *fname = "/root/vnode0/data/v0f1897.head-1"; + char *bname = basename(fname); + int vid = 0; + int fid = 0; + int seq = 0; + + sscanf(bname, "v%df%d", &vid, &fid); + sscanf(bname, "*%d", NULL, &seq); + + printf("vid:%d fid:%d seq:%d", vid, fid, seq); + return 0; +} diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index b2b0ff05f58478e3778d3abab72ae3511f683aca..36acb83f6900d1089280d2270397a9ae0d21834b 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -37,8 +37,6 @@ typedef struct { typedef struct { char * fname; int fd; - char * fsnap; - int sfd; char * fnew; int nfd; SHashObj * map; @@ -46,6 +44,7 @@ typedef struct { afterFunc aFunc; void * appH; SStoreInfo info; + SStoreInfo ninfo; } SKVStore; #define KVSTORE_MAGIC(s) (s)->info.magic @@ -57,8 +56,10 @@ void tdCloseKVStore(SKVStore *pStore); int tdKVStoreStartCommit(SKVStore *pStore); int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); -int tdKVStoreEndCommit(SKVStore *pStore); +int tdKVStoreEndCommit(SKVStore *pStore, bool hasError); void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size); +int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo); +void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); #ifdef __cplusplus } diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 6ba1d87d92ecf216bfde346f9d3ff1563515d34d..c54a0f8a3aa7471951c2eb530d9ba27561ce6b22 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -39,8 +39,6 @@ typedef struct { } SKVRecord; static int tdInitKVStoreHeader(int fd, char *fname); -static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo); -static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); static char * tdGetKVStoreSnapshotFname(char *fdata); static char * tdGetKVStoreNewFname(char *fdata); @@ -105,41 +103,6 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH goto _err; } - pStore->sfd = open(pStore->fsnap, O_RDONLY); - if (pStore->sfd < 0) { - if (errno != ENOENT) { - uError("failed to open file %s since %s", pStore->fsnap, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - } else { - uDebug("file %s exists, try to recover the KV store", pStore->fsnap); - if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info, &version) < 0) { - if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err; - } else { - if (version != KVSTORE_FILE_VERSION) { - uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fsnap, - version, KVSTORE_FILE_VERSION); - } - - if (taosFtruncate(pStore->fd, info.size) < 0) { - uError("failed to truncate %s to %" PRId64 " size since %s", pStore->fname, info.size, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; - if (fsync(pStore->fd) < 0) { - uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); - goto _err; - } - } - - close(pStore->sfd); - pStore->sfd = -1; - (void)remove(pStore->fsnap); - } - if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info, &version) < 0) goto _err; if (version != KVSTORE_FILE_VERSION) { uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fname, version, @@ -161,10 +124,7 @@ _err: close(pStore->fd); pStore->fd = -1; } - if (pStore->sfd > 0) { - close(pStore->sfd); - pStore->sfd = -1; - } + tdFreeKVStore(pStore); return NULL; } @@ -174,55 +134,27 @@ void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); } int tdKVStoreStartCommit(SKVStore *pStore) { ASSERT(pStore->fd < 0); - pStore->fd = open(pStore->fname, O_RDWR); + pStore->fd = open(pStore->fname, O_WRONLY); if (pStore->fd < 0) { uError("failed to open file %s since %s", pStore->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } - pStore->sfd = open(pStore->fsnap, O_WRONLY | O_CREAT, 0755); - if (pStore->sfd < 0) { - uError("failed to open file %s since %s", pStore->fsnap, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (taosTSendFile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { - uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (fsync(pStore->sfd) < 0) { - uError("failed to fsync file %s since %s", pStore->fsnap, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (close(pStore->sfd) < 0) { - uError("failed to close file %s since %s", pStore->fsnap, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - pStore->sfd = -1; - - if (lseek(pStore->fd, 0, SEEK_END) < 0) { + off_t ret = lseek(pStore->fd, 0, SEEK_END); + if (ret < 0) { uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } - ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR)); + ASSERT(pStore->info.size == ret); + + pStore->ninfo = pStore->info; return 0; _err: - if (pStore->sfd > 0) { - close(pStore->sfd); - pStore->sfd = -1; - (void)remove(pStore->fsnap); - } if (pStore->fd > 0) { close(pStore->fd); pStore->fd = -1; @@ -259,14 +191,14 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe return -1; } - pStore->info.magic = - taosCalcChecksum(pStore->info.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); - pStore->info.size += (sizeof(SKVRecord) + contLen); + pStore->ninfo.magic = + taosCalcChecksum(pStore->ninfo.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + pStore->ninfo.size += (sizeof(SKVRecord) + contLen); SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid)); if (pRecord != NULL) { // just to insert - pStore->info.tombSize += pRecord->size; + pStore->ninfo.tombSize += pRecord->size; } else { - pStore->info.nRecords++; + pStore->ninfo.nRecords++; } taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); @@ -298,11 +230,11 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { return -1; } - pStore->info.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf)); - pStore->info.size += POINTER_DISTANCE(pBuf, buf); - pStore->info.nDels++; - pStore->info.nRecords--; - pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + pStore->ninfo.magic = taosCalcChecksum(pStore->ninfo.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf)); + pStore->ninfo.size += POINTER_DISTANCE(pBuf, buf); + pStore->ninfo.nDels++; + pStore->ninfo.nRecords--; + pStore->ninfo.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid)); uDebug("drop uid %" PRIu64 " from KV store %s", uid, pStore->fname); @@ -310,25 +242,35 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { return 0; } -int tdKVStoreEndCommit(SKVStore *pStore) { +int tdKVStoreEndCommit(SKVStore *pStore, bool hasError) { ASSERT(pStore->fd > 0); - if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1; + if (!hasError) { + pStore->info = pStore->ninfo; - if (fsync(pStore->fd) < 0) { - uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) { + close(pStore->fd); + pStore->fd = -1; + return -1; + } + + if (fsync(pStore->fd) < 0) { + uError("failed to fsync file %s since %s", pStore->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + close(pStore->fd); + pStore->fd = -1; + return -1; + } } if (close(pStore->fd) < 0) { uError("failed to close file %s since %s", pStore->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); + pStore->fd = -1; return -1; } pStore->fd = -1; - (void)remove(pStore->fsnap); return 0; } @@ -359,6 +301,27 @@ _err: *size = 0; } +int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) { + int tlen = 0; + tlen += taosEncodeVariantI64(buf, pInfo->size); + tlen += taosEncodeVariantI64(buf, pInfo->tombSize); + tlen += taosEncodeVariantI64(buf, pInfo->nRecords); + tlen += taosEncodeVariantI64(buf, pInfo->nDels); + tlen += taosEncodeFixedU32(buf, pInfo->magic); + + return tlen; +} + +void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { + buf = taosDecodeVariantI64(buf, &(pInfo->size)); + buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); + buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); + buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + + return buf; +} + static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; @@ -417,27 +380,6 @@ static int tdInitKVStoreHeader(int fd, char *fname) { return tdUpdateKVStoreHeader(fd, fname, &info); } -static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) { - int tlen = 0; - tlen += taosEncodeVariantI64(buf, pInfo->size); - tlen += taosEncodeVariantI64(buf, pInfo->tombSize); - tlen += taosEncodeVariantI64(buf, pInfo->nRecords); - tlen += taosEncodeVariantI64(buf, pInfo->nDels); - tlen += taosEncodeFixedU32(buf, pInfo->magic); - - return tlen; -} - -static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) { - buf = taosDecodeVariantI64(buf, &(pInfo->size)); - buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); - buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); - buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - - return buf; -} - static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) { SKVStore *pStore = (SKVStore *)calloc(1, sizeof(SKVStore)); if (pStore == NULL) goto _err; @@ -448,16 +390,10 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void goto _err; } - pStore->fsnap = tdGetKVStoreSnapshotFname(fname); - if (pStore->fsnap == NULL) { - goto _err; - } - pStore->fnew = tdGetKVStoreNewFname(fname); if (pStore->fnew == NULL) goto _err; pStore->fd = -1; - pStore->sfd = -1; pStore->nfd = -1; pStore->iFunc = iFunc; pStore->aFunc = aFunc; @@ -478,7 +414,6 @@ _err: static void tdFreeKVStore(SKVStore *pStore) { if (pStore) { taosTFree(pStore->fname); - taosTFree(pStore->fsnap); taosTFree(pStore->fnew); taosHashCleanup(pStore->map); free(pStore);