提交 e27b797f 编写于 作者: H Hongze Cheng

submit part of codes

上级 8b288640
......@@ -156,18 +156,12 @@ typedef enum {
TSDB_FILE_TYPE_HEAD = 0,
TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_STAT,
TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NDATA,
TSDB_FILE_TYPE_NLAST,
TSDB_FILE_TYPE_NSTAT
TSDB_FILE_TYPE_MANIFEST,
TSDB_FILE_TYPE_META,
TSDB_FILE_TYPE_CFG
} TSDB_FILE_TYPE;
#ifndef TDINTERNAL
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1)
#else
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1)
#endif
typedef struct {
uint32_t magic;
......@@ -552,8 +546,7 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
#define IS_REPO_LOCKED(r) (r)->repoLocked
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
char* tsdbGetMetaFileName(char* rootDir);
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname);
int tsdbGetFileName(char* rootDir, int type, int vid, int fid, int seq, char** fname);
int tsdbLockRepo(STsdbRepo* pRepo);
int tsdbUnlockRepo(STsdbRepo* pRepo);
char* tsdbGetDataDirName(char* rootDir);
......@@ -572,6 +565,14 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx);
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle);
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle);
// -------------------------- ADDED --------------------------
typedef struct {
STsdbRepo* pRepo;
char fname[TSDB_FILENAME_LEN]; // manifest file name
int fd;
void* pBuffer;
SList* pModLog;
} SCommitHandle;
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "tsdbMain.h"
#include "tchecksum.h"
#define TSDB_DATA_FILE_CHANGE 0
#define TSDB_META_FILE_CHANGE 1
typedef struct {
int maxIters;
SCommitIter *pIters;
SRWHelper whelper;
SDataCols * pDataCols;
} STSCommitHandle;
typedef struct {
int32_t len;
int32_t type;
char change[];
} STsdbFileChange;
typedef struct {
char oname[TSDB_FILENAME_LEN];
char nname[TSDB_FILENAME_LEN];
SStoreInfo info;
} SMetaFileChange;
typedef struct {
SFileGroup ofgroup;
SFileGroup nfgroup;
} SDataFileChange;
int tsdbCommitData(STsdbRepo *pRepo) {
ASSERT(pRepo->commit == 1 && pRepo->imem != NULL);
SCommitHandle commitHandle = {0};
SCommitHandle *pCommitH = &commitHandle;
pCommitH->pRepo = pRepo;
if (tsdbStartCommit(pCommitH) < 0) return -1;
if (tsdbCommitTimeSeriesData(pCommitH) < 0) goto _err;
if (tsdbCommitMetaData(pCommitH) < 0) goto _err;
if (tsdbApplyRetention(pCommitH) < 0) goto _err;
tsdbEndCommit(pCommitH, false);
return 0;
_err:
tsdbEndCommit(pCommitH, true);
return -1;
}
static int tsdbStartCommit(SCommitHandle *pCommitH) {
STsdbRepo *pRepo = pCommitH->pRepo;
SMemTable *pMem = pRepo->imem;
STsdbCfg * pCfg = &(pRepo->config);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
pCommitH->pModLog = tdListNew(sizeof(void *));
if (pCommitH->pModLog == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pCommitH->fd = -1;
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, pCommitH->fname);
pCommitH->fd = open(pCommitH->fname, O_CREAT | O_WRONLY | O_APPEND, 0755);
if (pCommitH->fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
return 0;
_err:
if (pCommitH->fd >= 0) {
close(pCommitH->fd);
pCommitH->fd = -1;
remove(pCommitH->fname);
}
tdListFree(pCommitH->pModLog);
return -1;
}
static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) {
// TODO: append commit over flag
if (false /* tsdbLogCommitOver(pCommitH) < 0 */) {
hasError = true;
}
tsdbInfo("vgId:%d commit over, commit status: %s", REPO_ID(pRepo), hasError ? "FAILED" : "SUCCEED");
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pCommitH->pModLog)) != NULL) {
STsdbFileChange *pChange = (STsdbFileChange *)(*(void **)pNode->data);
tsdbApplyFileChange(pChange, !hasError);
free(pNode);
free(pChange);
}
close(pCommitH->fd);
pCommitH->fd = -1;
remove(pCommitH->fname);
tdListFree(pCommitH->pModLog);
return;
}
static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
STsdbRepo * pRepo = pCommitH->pRepo;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &(pRepo->config);
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
for (int i = 0; i < pFileH->nFGroups; i++) {
SFileGroup *pFGroup = pFileH->pFGroup[i];
if (pFGroup->fileId < mfid) {
STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(STsdbFileChange) + sizeof(STsdbFileChange));
if (pChange == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pChange->type = TSDB_DATA_FILE_CHANGE;
SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change;
pDataFileChange->ofgroup = pFGroup;
} else {
break;
}
}
if (pMem->numOfRows <= 0) return 0;
// Initialize resources
STSCommitHandle tsCommitH = {0};
if (tsdbInitTSCommitHandle(&tsCommitH, pRepo) < 0) return -1;
// Commit Time-Series data file by file
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
for (int fid = sfid; fid <= efid; fid++) {
TSKEY minKey = 0, maxKey = 0;
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
if (fid < mfid) {
// TODO: skip data in this file beyond retentioin and continue;
continue;
}
if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue;
{
// TODO: manifest log file group action
}
if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
}
tsdbDestroyTSCommitHandle(&tsCommitH);
return 0;
_err:
tsdbDestroyTSCommitHandle(&tsCommitH);
return -1;
}
// Function to commit meta data
static int tsdbCommitMetaData(SCommitHandle *pCommitH) {
STsdbRepo *pRepo = pCommitH->pRepo;
SKVStore * pStore = pRepo->tsdbMeta->pStore;
SMemTable *pMem = pRepo->imem;
SActObj * pAct = NULL;
SActCont * pCont = NULL;
if (listNEles(pMem->actList) <= 0) return 0;
// Log meta file change
if (tsdbLogMetaFileChange(pCommitH) < 0) return -1;
// Commit data
if (tdKVStoreStartCommit(pStore) < 0) {
tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
pAct = (SActObj *)pNode->data;
if (pAct->act == TSDB_UPDATE_META) {
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
if (tdUpdateKVStoreRecord(pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pStore, true /*hasErro*/);
return -1;
}
} else if (pAct->act == TSDB_DROP_META) {
if (tdDropKVStoreRecord(pStore, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno));
tdKVStoreEndCommit(pStore, true /*hasErro*/);
return -1;
}
} else {
ASSERT(false);
}
}
if (tdKVStoreEndCommit(pMeta->pStore, false /*hasError = false*/) < 0) {
tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
return 0;
}
static int tsdbApplyRetention(SCommitHandle *pCommitH) {
// TODO
return 0;
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
// reference all tables
for (int i = 0; i < pMem->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
iters[i].pTable = pMeta->tables[i];
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
for (int i = 0; i < pMem->maxTables; i++) {
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
tSkipListIterNext(iters[i].pIter);
}
}
return iters;
_err:
tsdbDestroyCommitIters(iters, pMem->maxTables);
return NULL;
}
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
if (iters == NULL) return;
for (int i = 1; i < maxTables; i++) {
if (iters[i].pTable != NULL) {
tsdbUnRefTable(iters[i].pTable);
tSkipListDestroyIter(iters[i].pIter);
}
}
free(iters);
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) {
char * dataDir = NULL;
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH * pFileH = pRepo->tsdbFileH;
SFileGroup * pGroup = NULL;
SMemTable * pMem = pRepo->imem;
bool newLast = false;
SCommitIter *iters = pTSCh->pIters;
SRWHelper * pHelper = &(pTSCh->whelper);
SDataCols * pDataCols = pTSCh->pDataCols;
// Create and open files for commit
dataDir = tsdbGetDataDirName(pRepo->rootDir);
if (dataDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
newLast = TSDB_NLAST_FILE_OPENED(pHelper);
if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch));
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
if (pIter->pIter != NULL) {
if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
goto _err;
}
}
taosRUnLockLatch(&(pIter->pTable->latch));
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock));
(void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
if (newLast) {
(void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
} else {
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
}
pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
pthread_rwlock_unlock(&(pFileH->fhlock));
return 0;
_err:
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 1, NULL);
return -1;
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
}
static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) {
STsdbCfg * pCfg = &(pRepo->config);
STsdbMeta *pMeta = pRepo->tsdbMeta;
SMemTable *pMem = pRepo->imem;
pTSCh->pIters = tsdbCreateCommitIters(pRepo);
if (pTSCh->pIters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbDestroyTSCommitHandle(pTSCh);
return -1;
}
pTSCh->maxIters = pMem->maxTables;
if (tsdbInitWriteHelper(&(pTSCh->whelper), pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbDestroyTSCommitHandle(pTSCh);
return -1;
}
pTSCh->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock);
if (pTSCh->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
tsdbDestroyTSCommitHandle(pTSCh);
return -1;
}
return 0;
}
static void tsdbDestroyTSCommitHandle(STSCommitHandle *pTSCh) {
if (pTSCh) {
tdFreeDataCols(pTSCh->pDataCols);
tsdbDestroyHelper(&(pTSCh->whelper));
tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters);
}
}
static int tsdbLogFileChange(SCommitHandle *pCommitH, STsdbFileChange *pChange) {
STsdbRepo *pRepo = pCommitH->pRepo;
pChange->len = tsdbEncodeFileChange(NULL, pChange) + sizeof(TSCKSUM);
if ((pCommitH->pBuffer = taosTRealloc(pCommitH->pBuffer, pChange->len)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (taosTWrite(pCommitH->fd, (void *)pChange, sizeof(*pChange)) < sizeof(*pChange)) {
tsdbError("vgId:%d failed to write file change to file %s since %s", REPO_ID(pRepo), pCommitH->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
int tsize = tsdbEncodeFileChange(pCommitH->pBuffer, pChange);
ASSERT(tsize + sizeof(TSCKSUM) == pChange->len);
taosCalcChecksumAppend(0, pCommitH->pBuffer, pChange->len);
if (taosTWrite(pCommitH->fd, pCommitH->pBuffer, pChange->len) < pChange->len) {
tsdbError("vgId:%d failed to write file change encode to file %s, bytes %d since %s", REPO_ID(pRepo),
pCommitH->fname, pChange->len, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (fsync(pCommitH->fd) < 0) {
tsdbError("vgId:%d failed to fsync file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) {
int tsize = 0;
if (pChange->type == TSDB_META_FILE_CHANGE) {
SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change;
tsize += taosEncodeString(buf, pMetaChange->oname);
tsize += taosEncodeString(buf, pMetaChange->nname);
tsize += tdEncodeStoreInfo(buf, pMetaChange->info);
} else if (pChange->type == TSDB_DATA_FILE_CHANGE) {
SDataFileChange *pDataChange = (SDataFileChange *)pChange->change;
// TODO
} else {
ASSERT(false);
}
return tsize;
}
static void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) {
// TODO
return buf;
}
static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) {
STsdbRepo *pRepo = pCommitH->pRepo;
SKVStore * pStore = pRepo->tsdbMeta->pStore;
STsdbFileChange *pChange = (STsdbFileChange *)calloc(1, sizeof(*pChange) + sizeof(SMetaFileChange));
if (pChange == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pChange->type = TSDB_META_FILE_CHANGE;
SMetaFileChange *pMetaChange = (SMetaFileChange *)(pChange->change);
strncpy(pMetaChange->oname, pStore->fname, TSDB_FILENAME_LEN);
strncpy(pMetaChange->nname, pStore->fname, TSDB_FILENAME_LEN);
pMetaChange->info = pStore->info;
if (tsdbLogFileChange(pCommitH, pChange) < 0) {
free(pChange);
return -1;
}
tdListPrepend(pCommitH->pModLog, &pChange);
return 0;
}
static int
static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) {
if (pChange->type == TSDB_META_FILE_CHANGE) {
SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change;
if (isCommitEnd) {
if (strncmp(pMetaChange->oname, pMetaChange->nname) != 0) {
(void)remove(pMetaChange->oname);
}
} else { // roll back
// TODO
}
} else if (pChange->len == TSDB_DATA_FILE_CHANGE) {
} else {
ASSERT(0);
}
return 0;
}
\ No newline at end of file
......@@ -23,9 +23,6 @@
#include "tsdbMain.h"
#include "tutil.h"
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2);
......@@ -128,7 +125,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
if (fid < mfid) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbGetDataFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname);
tsdbGetFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname);
(void)remove(fname);
}
continue;
......@@ -345,7 +342,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
memset((void *)pFile, 0, sizeof(SFile));
pFile->fd = -1;
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
if (access(pFile->fname, F_OK) == 0) {
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname);
......@@ -525,7 +522,7 @@ _err:
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version;
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
......
......@@ -23,9 +23,6 @@
#include "tsdb.h"
#include "tulog.h"
#define TSDB_CFG_FILE_NAME "config"
#define TSDB_DATA_DIR_NAME "data"
#define TSDB_META_FILE_NAME "meta"
#define TSDB_META_FILE_INDEX 10000000
#define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
......@@ -49,7 +46,6 @@ static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg);
static int32_t tsdbUnsetRepoEnv(char *rootDir);
static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg);
static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg);
static char * tsdbGetCfgFname(char *rootDir);
static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
static void tsdbFreeRepo(STsdbRepo *pRepo);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
......@@ -233,7 +229,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) {
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
fname = tsdbGetMetaFileName(pRepo->rootDir);
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname);
*index = TSDB_META_FILE_INDEX;
magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta);
} else {
......@@ -345,22 +341,6 @@ int tsdbGetState(TSDB_REPO_T *repo) {
}
// ----------------- INTERNAL FUNCTIONS -----------------
char *tsdbGetMetaFileName(char *rootDir) {
int tlen = (int)(strlen(rootDir) + strlen(TSDB_META_FILE_NAME) + 2);
char *fname = calloc(1, tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, tlen, "%s/%s", rootDir, TSDB_META_FILE_NAME);
return fname;
}
void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, char *fname) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]);
}
int tsdbLockRepo(STsdbRepo *pRepo) {
int code = pthread_mutex_lock(&pRepo->mutex);
if (code != 0) {
......@@ -557,15 +537,11 @@ static int32_t tsdbUnsetRepoEnv(char *rootDir) {
static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
int fd = -1;
char *fname = NULL;
char fname[TSDB_FILENAME_LEN] = "\0";
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
char *pBuf = buf;
fname = tsdbGetCfgFname(rootDir);
if (fname == NULL) {
tsdbError("vgId:%d failed to save configuration since %s", pCfg->tsdbId, tstrerror(terrno));
goto _err;
}
tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname);
fd = open(fname, O_WRONLY | O_CREAT, 0755);
if (fd < 0) {
......@@ -592,26 +568,20 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
goto _err;
}
free(fname);
close(fd);
return 0;
_err:
taosTFree(fname);
if (fd >= 0) close(fd);
return -1;
}
static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
char *fname = NULL;
int fd = -1;
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
char fname[TSDB_FILENAME_LEN] = "\0";
int fd = -1;
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
fname = tsdbGetCfgFname(rootDir);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
tsdbGetFileName(rootDir, TSDB_FILE_TYPE_CFG, 0, 0, 0, &fname);
fd = open(fname, O_RDONLY);
if (fd < 0) {
......@@ -634,29 +604,14 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
tsdbDecodeCfg(buf, pCfg);
taosTFree(fname);
close(fd);
return 0;
_err:
taosTFree(fname);
if (fd >= 0) close(fd);
return -1;
}
static char *tsdbGetCfgFname(char *rootDir) {
int tlen = (int)(strlen(rootDir) + strlen(TSDB_CFG_FILE_NAME) + 2);
char *fname = calloc(1, tlen);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
snprintf(fname, tlen, "%s/%s", rootDir, TSDB_CFG_FILE_NAME);
return fname;
}
static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo));
if (pRepo == NULL) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "tchecksum.h"
#include "tsdbMain.h"
#define TSDB_MANIFEST_FILE_VERSION 0
#define TSDB_MANIFEST_FILE_HEADER_SIZE 128
#define TSDB_MANIFEST_END "C0D09F476DEF4A32B694A6A9E7B7B240"
#define TSDB_MANIFEST_END_SIZE 32
#define TSDB_MANIFEST_END_RECORD 0
#define TSDB_MANIFEST_META_RECORD 1
#define TSDB_MANIFEST_DATA_RECORD 2
typedef struct {
int type;
int len;
} SManifestRecord;
int tsdbInitManifestHandle(STsdbRepo *pRepo, SManifestHandle *pManifest) {
STsdbCfg *pCfg = &(pRepo->config);
pManifest->pBuffer = NULL;
pManifest->contSize = 0;
tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pManifest->fname));
pManifest->fd = open(pManifest->fname, O_CREAT | O_APPEND, 0755);
if (pManifest->fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tsdbWriteManifestHeader(pRepo, pManifest) < 0) {
tsdbCloseManifestHandle(pRepo, pManifest);
return -1;
}
return 0;
}
void tsdbCloseManifestHandle(SManifestHandle *pManifest) {
if (pManifest != NULL && pManifest->fd > 0) {
close(pManifest->fd);
pManifest->fd = -1;
}
remove(pManifest->fname);
taosTZfree(pManifest->pBuffer);
pManifest->pBuffer = NULL;
pManifest->contSize = 0;
return 0;
}
int tsdbAppendManifestRecord(SManifestHandle *pManifest, STsdbRepo *pRepo, int type) {
ASSERT(pManifest->pBuffer != NULL && taosTSizeof(pManifest->pBuffer) >= pManifest->contSize);
if (pManifest->contSize > 0) {
if (tsdbManifestMakeMoreRoom(pManifest, sizeof(TSCKSUM)) < 0) return -1;
pManifest->contSize += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pManifest->pBuffer, pManifest->contSize);
}
SManifestRecord mRecord = {.type = type, .len = pManifest->contSize};
// Write mRecord part
if (taosTWrite(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < sizeof(mRecord)) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), sizeof(mRecord), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// Write buffer part
if (pManifest->contSize > 0 && taosTWrite(pManifest->fd, pManifest->pBuffer, pManifest->contSize) < pManifest->contSize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), pManifest->contSize,
pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (fsync(pManifest->fd) < 0) {
tsdbError("vgId:%d failed to fsync file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
int tsdbAppendManifestEnd(SManifestHandle *pManifest, STsdbRepo *pRepo) {
pManifest->contSize = 0;
return tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_END_RECORD);
}
int tsdbManifestMakeRoom(SManifestHandle *pManifest, int expectedSize) {
pManifest->pBuffer = taosTRealloc(pManifest->pBuffer, expectedSize);
if (pManifest->pBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int tsdbManifestMakeMoreRoom(SManifestHandle *pManifest, int moreSize) {
return tsdbManifestMakeRoom(pManifest, pManifest->contSize + moreSize);
}
// TODO
bool tsdbIsManifestEnd(SManifestHandle *pManifest) {
SManifestRecord mRecord;
if (lseek(pManifest->fd, sizeof(mRecord), SEEK_END) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return false;
}
if (taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < 0) {
tsdbError("vgId:%d failed to read manifest end from file %s since %s", REPO_ID(pRepo), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return false;
}
return (mRecord.type == TSDB_MANII)
}
int tsdbManifestRollBackOrForward(SManifestHandle *pManifest, bool isManifestEnd, STsdbRepo *pRepo) {
SManifestRecord mRecord;
if (lseek(pManifest->fd, TSDB_MANIFEST_FILE_HEADER_SIZE, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return;
}
while (true) {
ssize_t size = 0;
size = taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord));
if (size < 0) {
tsdbError("vgId:%d failed to read SManifestRecord part from file %s since %s", REPO_ID(pRepo), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (size < sizeof(mRecord)) break;
if ((mRecord.type != TSDB_MANIFEST_DATA_RECORD && mRecord.type != TSDB_MANIFEST_META_RECORD && mRecord.type != TSDB_MANIFEST_END_RECORD) || mRecord.len < 0) {
tsdbError("vgId:%d manifest file %s is broken since invalid mRecord content", REPO_ID(pRepo), pManifest->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (mRecord.type == TSDB_MANIFEST_END_RECORD) {
ASSERT(isManifestEnd && mRecord.len == 0);
break;
}
if (tsdbManifestMakeRoom(pManifest, mRecord.len) < 0) return -1;
size = taosTRead(pManifest->fd, pManifest->pBuffer, mRecord.len);
if (size < 0) {
tsdbError("vgId:%d failed to read SManifestRecord content from file %s since %s", REPO_ID(pRepo), pManifest->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (size < mRecord.len) break;
if (!taosCheckChecksumWhole((uint8_t *)pManifest->pBuffer, size)) {
tsdbError("vgId:%d manifest file %s is broken since checksum error", REPO_ID(pRepo), pManifest->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return -1;
}
if (mRecord.type == TSDB_MANIFEST_DATA_RECORD) {
// func1(pManifest->pBuffer, mRecord.len, isManifestEnd);
} else if (mRecord.type == TSDB_MANIFEST_META_RECORD) {
// func2(pManifest->pBuffer, mRecord.len, isManifestEnd);
} else {
ASSERT(0);
}
}
return 0;
}
int tsdbEncodeManifestRecord(SManifestHandle *pManifest) {
pManifest->contSize = 0;
}
static int tsdbEncodeManifestHeader(void **buffer) {
int len = taosEncodeFixedU32(buf, TSDB_MANIFEST_FILE_VERSION);
return len;
}
static void *tsdbDecodeManifestHeader(void *buffer, uint32_t version) {
buffer = taosDecodeFixedU32(buffer, &version);
return buffer;
}
static int tsdbWriteManifestHeader(STsdbRepo *pRepo, SManifestHandle *pManifest) {
char buffer[TSDB_MANIFEST_FILE_HEADER_SIZE] = "\0";
tsdbEncodeManifestHeader(&buffer);
taosCalcChecksumAppend(0, (uint8_t)buffer, TSDB_MANIFEST_FILE_HEADER_SIZE);
if (taosTWrite(pManifest->fd, buffer, TSDB_MANIFEST_FILE_HEADER_SIZE) < 0) {
tsdbError("vgId:%d failed to write file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
\ No newline at end of file
......@@ -464,19 +464,21 @@ static void tsdbFreeTableData(STableData *pTableData) {
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); }
static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
ASSERT(pRepo->commit == 1);
ASSERT(pMem != NULL);
STsdbRepo * pRepo = (STsdbRepo *)arg;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitHandle commitHandle = {0};
SCommitHandle *pCommitH = &commitHandle;
ASSERT(pRepo->commit == 1 && pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
pCommitH->pRepo = pRepo;
if (tsdbInitManifestHandle(pRepo, &(pCommitH->manifest)) < 0) goto _exit;
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
......@@ -485,7 +487,7 @@ static void *tsdbCommitData(void *arg) {
goto _exit;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
if (tsdbInitWriteHelper(&(pCommitH->whelper), pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
......@@ -502,7 +504,7 @@ static void *tsdbCommitData(void *arg) {
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
if (tsdbCommitToFile(pRepo, fid, iters, &(pCommitH->whelper), pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
......@@ -517,23 +519,34 @@ static void *tsdbCommitData(void *arg) {
tsdbFitRetention(pRepo);
if (tsdbAppendManifestEnd(&pCommitH->manifest, pCommitH->pRepo) < 0) {
// TODO
}
tsdbApplyManifestAction(&pCommitH->manifest);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbCloseManifestHandle(&(pCommitH->manifest));
tsdbDestroyHelper(&(pCommitH->whelper));
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
return NULL;
}
static int tsdbCommitMeta(STsdbRepo *pRepo) {
static int tsdbCommitMeta(STsdbRepo *pRepo, SManifestHandle *pManifest) {
SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta;
SActObj * pAct = NULL;
SActCont * pCont = NULL;
if (listNEles(pMem->actList) > 0) {
pManifest->contSize = tdEncodeCommitAction(pMeta->pStore, &(pManifest->pBuffer));
if (tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_META_RECORD) < 0) goto _err;
if (tdKVStoreStartCommit(pMeta->pStore) < 0) {
tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
......
......@@ -472,11 +472,7 @@ int tsdbOpenMeta(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
ASSERT(pMeta != NULL);
fname = tsdbGetMetaFileName(pRepo->rootDir);
if (fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_META, 0, 0, 0, &fname) < 0) goto _err;
pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo);
if (pMeta->pStore == NULL) {
......
......@@ -113,9 +113,9 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Set the files
pHelper->files.fGroup = *pGroup;
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
helperNewHeadF(pHelper)->fname);
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
tsdbGetFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
helperNewLastF(pHelper)->fname);
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "libgen.h"
#include "stdio.h"
#include "tsdbMain.h"
#define TSDB_DATA_DIR_NAME "data"
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".manifest", "meta", "config"};
int tsdbGetFileName(char *rootDir, int type, int vid, int fid, int seq, char **fname) {
if (*fname == NULL) {
*fname = (char *)malloc(TSDB_FILENAME_LEN);
if (*fname == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
switch (type) {
case TSDB_FILE_TYPE_HEAD:
case TSDB_FILE_TYPE_DATA:
case TSDB_FILE_TYPE_LAST:
if (seq == 0) { // For backward compatibility
snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid,
tsdbFileSuffix[type]);
} else {
snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s-%d", rootDir, TSDB_DATA_DIR_NAME, vid, fid,
tsdbFileSuffix[type], seq);
}
break;
case TSDB_FILE_TYPE_MANIFEST:
snprintf(*fname, TSDB_FILENAME_LEN, "%s/v%d%s", rootDir, vid, tsdbFileSuffix[type]);
break;
case TSDB_FILE_TYPE_META:
case TSDB_FILE_TYPE_CFG:
snprintf(*fname, TSDB_FILENAME_LEN, "%s/%s", rootDir, tsdbFileSuffix[type]);
break;
default:
ASSERT(0);
break;
}
return 0;
}
int tsdbParseFileName(char *fname, int *type, int *vid, int *fid, int *seq) {
// TODO
return 0;
}
int tsdbGetNextSeqNum(int currentNum) {
if (currentNum == 0) {
return 1;
} else {
return 0;
}
}
// ========================= TEST =========================
#include <stdio.h>
#include <libgen.h>
int main(int argc, char const *argv[])
{
char *fname = "/root/vnode0/data/v0f1897.head-1";
char *bname = basename(fname);
int vid = 0;
int fid = 0;
int seq = 0;
sscanf(bname, "v%df%d", &vid, &fid);
sscanf(bname, "*%d", NULL, &seq);
printf("vid:%d fid:%d seq:%d", vid, fid, seq);
return 0;
}
......@@ -37,8 +37,6 @@ typedef struct {
typedef struct {
char * fname;
int fd;
char * fsnap;
int sfd;
char * fnew;
int nfd;
SHashObj * map;
......@@ -46,6 +44,7 @@ typedef struct {
afterFunc aFunc;
void * appH;
SStoreInfo info;
SStoreInfo ninfo;
} SKVStore;
#define KVSTORE_MAGIC(s) (s)->info.magic
......@@ -57,8 +56,10 @@ void tdCloseKVStore(SKVStore *pStore);
int tdKVStoreStartCommit(SKVStore *pStore);
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore *pStore);
int tdKVStoreEndCommit(SKVStore *pStore, bool hasError);
void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size);
int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo);
void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
#ifdef __cplusplus
}
......
......@@ -39,8 +39,6 @@ typedef struct {
} SKVRecord;
static int tdInitKVStoreHeader(int fd, char *fname);
static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo);
static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
static char * tdGetKVStoreSnapshotFname(char *fdata);
static char * tdGetKVStoreNewFname(char *fdata);
......@@ -105,41 +103,6 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
goto _err;
}
pStore->sfd = open(pStore->fsnap, O_RDONLY);
if (pStore->sfd < 0) {
if (errno != ENOENT) {
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} else {
uDebug("file %s exists, try to recover the KV store", pStore->fsnap);
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info, &version) < 0) {
if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err;
} else {
if (version != KVSTORE_FILE_VERSION) {
uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fsnap,
version, KVSTORE_FILE_VERSION);
}
if (taosFtruncate(pStore->fd, info.size) < 0) {
uError("failed to truncate %s to %" PRId64 " size since %s", pStore->fname, info.size, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
if (fsync(pStore->fd) < 0) {
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
goto _err;
}
}
close(pStore->sfd);
pStore->sfd = -1;
(void)remove(pStore->fsnap);
}
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info, &version) < 0) goto _err;
if (version != KVSTORE_FILE_VERSION) {
uError("file %s version %u is not the same as program version %u, this may cause problem", pStore->fname, version,
......@@ -161,10 +124,7 @@ _err:
close(pStore->fd);
pStore->fd = -1;
}
if (pStore->sfd > 0) {
close(pStore->sfd);
pStore->sfd = -1;
}
tdFreeKVStore(pStore);
return NULL;
}
......@@ -174,55 +134,27 @@ void tdCloseKVStore(SKVStore *pStore) { tdFreeKVStore(pStore); }
int tdKVStoreStartCommit(SKVStore *pStore) {
ASSERT(pStore->fd < 0);
pStore->fd = open(pStore->fname, O_RDWR);
pStore->fd = open(pStore->fname, O_WRONLY);
if (pStore->fd < 0) {
uError("failed to open file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pStore->sfd = open(pStore->fsnap, O_WRONLY | O_CREAT, 0755);
if (pStore->sfd < 0) {
uError("failed to open file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosTSendFile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (fsync(pStore->sfd) < 0) {
uError("failed to fsync file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (close(pStore->sfd) < 0) {
uError("failed to close file %s since %s", pStore->fsnap, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pStore->sfd = -1;
if (lseek(pStore->fd, 0, SEEK_END) < 0) {
off_t ret = lseek(pStore->fd, 0, SEEK_END);
if (ret < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(pStore->info.size == lseek(pStore->fd, 0, SEEK_CUR));
ASSERT(pStore->info.size == ret);
pStore->ninfo = pStore->info;
return 0;
_err:
if (pStore->sfd > 0) {
close(pStore->sfd);
pStore->sfd = -1;
(void)remove(pStore->fsnap);
}
if (pStore->fd > 0) {
close(pStore->fd);
pStore->fd = -1;
......@@ -259,14 +191,14 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
return -1;
}
pStore->info.magic =
taosCalcChecksum(pStore->info.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
pStore->info.size += (sizeof(SKVRecord) + contLen);
pStore->ninfo.magic =
taosCalcChecksum(pStore->ninfo.magic, (uint8_t *)POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
pStore->ninfo.size += (sizeof(SKVRecord) + contLen);
SKVRecord *pRecord = taosHashGet(pStore->map, (void *)&uid, sizeof(uid));
if (pRecord != NULL) { // just to insert
pStore->info.tombSize += pRecord->size;
pStore->ninfo.tombSize += pRecord->size;
} else {
pStore->info.nRecords++;
pStore->ninfo.nRecords++;
}
taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
......@@ -298,11 +230,11 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
return -1;
}
pStore->info.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf));
pStore->info.size += POINTER_DISTANCE(pBuf, buf);
pStore->info.nDels++;
pStore->info.nRecords--;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
pStore->ninfo.magic = taosCalcChecksum(pStore->ninfo.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf));
pStore->ninfo.size += POINTER_DISTANCE(pBuf, buf);
pStore->ninfo.nDels++;
pStore->ninfo.nRecords--;
pStore->ninfo.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid));
uDebug("drop uid %" PRIu64 " from KV store %s", uid, pStore->fname);
......@@ -310,25 +242,35 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
return 0;
}
int tdKVStoreEndCommit(SKVStore *pStore) {
int tdKVStoreEndCommit(SKVStore *pStore, bool hasError) {
ASSERT(pStore->fd > 0);
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) return -1;
if (!hasError) {
pStore->info = pStore->ninfo;
if (fsync(pStore->fd) < 0) {
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &(pStore->info)) < 0) {
close(pStore->fd);
pStore->fd = -1;
return -1;
}
if (fsync(pStore->fd) < 0) {
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
close(pStore->fd);
pStore->fd = -1;
return -1;
}
}
if (close(pStore->fd) < 0) {
uError("failed to close file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
pStore->fd = -1;
return -1;
}
pStore->fd = -1;
(void)remove(pStore->fsnap);
return 0;
}
......@@ -359,6 +301,27 @@ _err:
*size = 0;
}
int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeVariantI64(buf, pInfo->size);
tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
tlen += taosEncodeVariantI64(buf, pInfo->nDels);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
return tlen;
}
void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->size));
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
return buf;
}
static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t *version) {
char buf[TD_KVSTORE_HEADER_SIZE] = "\0";
......@@ -417,27 +380,6 @@ static int tdInitKVStoreHeader(int fd, char *fname) {
return tdUpdateKVStoreHeader(fd, fname, &info);
}
static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeVariantI64(buf, pInfo->size);
tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
tlen += taosEncodeVariantI64(buf, pInfo->nDels);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
return tlen;
}
static void *tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->size));
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
return buf;
}
static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH) {
SKVStore *pStore = (SKVStore *)calloc(1, sizeof(SKVStore));
if (pStore == NULL) goto _err;
......@@ -448,16 +390,10 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void
goto _err;
}
pStore->fsnap = tdGetKVStoreSnapshotFname(fname);
if (pStore->fsnap == NULL) {
goto _err;
}
pStore->fnew = tdGetKVStoreNewFname(fname);
if (pStore->fnew == NULL) goto _err;
pStore->fd = -1;
pStore->sfd = -1;
pStore->nfd = -1;
pStore->iFunc = iFunc;
pStore->aFunc = aFunc;
......@@ -478,7 +414,6 @@ _err:
static void tdFreeKVStore(SKVStore *pStore) {
if (pStore) {
taosTFree(pStore->fname);
taosTFree(pStore->fsnap);
taosTFree(pStore->fnew);
taosHashCleanup(pStore->map);
free(pStore);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册