未验证 提交 0f3d18f8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1881 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -109,6 +109,8 @@ int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); ...@@ -109,6 +109,8 @@ int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, int64_t uid); TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, int64_t uid);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *size);
// the TSDB repository info // the TSDB repository info
typedef struct STsdbRepoInfo { typedef struct STsdbRepoInfo {
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
......
...@@ -151,8 +151,6 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable); ...@@ -151,8 +151,6 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable);
STsdbMeta *tsdbGetMeta(TsdbRepoT *pRepo); STsdbMeta *tsdbGetMeta(TsdbRepoT *pRepo);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable); // int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid); STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
...@@ -496,9 +494,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper); ...@@ -496,9 +494,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper);
int tsdbWriteCompIdx(SRWHelper *pHelper); int tsdbWriteCompIdx(SRWHelper *pHelper);
// --------- Other functions need to further organize // --------- Other functions need to further organize
void tsdbFitRetention(STsdbRepo *pRepo); void tsdbFitRetention(STsdbRepo *pRepo);
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
void tsdbAdjustCacheBlocks(STsdbCache *pCache); void tsdbAdjustCacheBlocks(STsdbCache *pCache);
int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "tscompression.h" #include "tscompression.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "ttime.h" #include "ttime.h"
#include <sys/stat.h>
int tsdbDebugFlag = 135; int tsdbDebugFlag = 135;
...@@ -133,6 +134,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) ...@@ -133,6 +134,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */)
*/ */
int32_t tsdbDropRepo(TsdbRepoT *repo) { int32_t tsdbDropRepo(TsdbRepoT *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
int id = pRepo->config.tsdbId;
pRepo->state = TSDB_REPO_STATE_CLOSED; pRepo->state = TSDB_REPO_STATE_CLOSED;
...@@ -148,6 +150,8 @@ int32_t tsdbDropRepo(TsdbRepoT *repo) { ...@@ -148,6 +150,8 @@ int32_t tsdbDropRepo(TsdbRepoT *repo) {
free(pRepo->rootDir); free(pRepo->rootDir);
free(pRepo); free(pRepo);
tsdbTrace("vgId %d: tsdb repository is dropped!", id);
return 0; return 0;
} }
...@@ -238,6 +242,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -238,6 +242,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
pRepo->state = TSDB_REPO_STATE_ACTIVE; pRepo->state = TSDB_REPO_STATE_ACTIVE;
tsdbTrace("vgId %d: open tsdb repository successfully!", pRepo->config.tsdbId);
return (TsdbRepoT *)pRepo; return (TsdbRepoT *)pRepo;
} }
...@@ -256,6 +261,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) { ...@@ -256,6 +261,7 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
int32_t tsdbCloseRepo(TsdbRepoT *repo) { int32_t tsdbCloseRepo(TsdbRepoT *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return 0; if (pRepo == NULL) return 0;
int id = pRepo->config.tsdbId;
pRepo->state = TSDB_REPO_STATE_CLOSED; pRepo->state = TSDB_REPO_STATE_CLOSED;
tsdbLockRepo(repo); tsdbLockRepo(repo);
...@@ -289,6 +295,8 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) { ...@@ -289,6 +295,8 @@ int32_t tsdbCloseRepo(TsdbRepoT *repo) {
tfree(pRepo->rootDir); tfree(pRepo->rootDir);
tfree(pRepo); tfree(pRepo);
tsdbTrace("vgId %d: repository is closed!", id);
return 0; return 0;
} }
...@@ -349,6 +357,7 @@ int32_t tsdbTriggerCommit(TsdbRepoT *repo) { ...@@ -349,6 +357,7 @@ int32_t tsdbTriggerCommit(TsdbRepoT *repo) {
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo); pthread_create(&(pRepo->commitThread), &thattr, tsdbCommitData, (void *)repo);
tsdbTrace("vgId %d: start to commit!", pRepo->config.tsdbId);
return 0; return 0;
} }
...@@ -376,11 +385,6 @@ STsdbRepoInfo *tsdbGetStatus(TsdbRepoT *pRepo) { ...@@ -376,11 +385,6 @@ STsdbRepoInfo *tsdbGetStatus(TsdbRepoT *pRepo) {
return NULL; return NULL;
} }
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
return tsdbCreateTableImpl(pRepo->tsdbMeta, pCfg);
}
int tsdbAlterTable(TsdbRepoT *pRepo, STableCfg *pCfg) { int tsdbAlterTable(TsdbRepoT *pRepo, STableCfg *pCfg) {
// TODO // TODO
return 0; return 0;
...@@ -395,13 +399,6 @@ TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, int64_t uid) { ...@@ -395,13 +399,6 @@ TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, int64_t uid) {
return TSDB_GET_TABLE_LAST_KEY(pTable); return TSDB_GET_TABLE_LAST_KEY(pTable);
} }
int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
if (repo == NULL) return -1;
STsdbRepo *pRepo = (STsdbRepo *)repo;
return tsdbDropTableImpl(pRepo->tsdbMeta, tableId);
}
STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) { STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
// TODO // TODO
return NULL; return NULL;
...@@ -760,7 +757,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) { ...@@ -760,7 +757,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
return -1; return -1;
} }
tsdbError( tsdbTrace(
"id %d: set up tsdb environment succeed! cacheBlockSize %d, totalBlocks %d, maxTables %d, daysPerFile %d, keep " "id %d: set up tsdb environment succeed! cacheBlockSize %d, totalBlocks %d, maxTables %d, daysPerFile %d, keep "
"%d, minRowsPerFileBlock %d, maxRowsPerFileBlock %d, precision %d, compression%d", "%d, minRowsPerFileBlock %d, maxRowsPerFileBlock %d, precision %d, compression%d",
pRepo->config.tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep, pRepo->config.tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep,
...@@ -842,6 +839,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -842,6 +839,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData); pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
tsdbTrace("vgId %d, tid %d, uid " PRId64 "a row is inserted to table! key" PRId64,
pRepo->config.tsdbId, pTable->tableId.tid, pTable->tableId.uid, dataRowKey(row));
return 0; return 0;
} }
...@@ -1104,11 +1104,14 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK ...@@ -1104,11 +1104,14 @@ static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minK
} }
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) { static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
int8_t oldCompRession = pRepo->config.compression;
pRepo->config.compression = compression; pRepo->config.compression = compression;
tsdbTrace("vgId %d: tsdb compression is changed from %d to %d", oldCompRession, compression);
} }
static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
STsdbCfg *pCfg = &pRepo->config; STsdbCfg *pCfg = &pRepo->config;
int oldKeep = pCfg->keep;
int maxFiles = keep / pCfg->maxTables + 3; int maxFiles = keep / pCfg->maxTables + 3;
if (pRepo->config.keep > keep) { if (pRepo->config.keep > keep) {
...@@ -1120,8 +1123,57 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { ...@@ -1120,8 +1123,57 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
} }
pRepo->tsdbFileH->maxFGroups = maxFiles; pRepo->tsdbFileH->maxFGroups = maxFiles;
} }
tsdbTrace("id %d: keep is changed from %d to %d", pRepo->config.tsdbId, oldKeep, keep);
} }
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
// TODO // TODO
int oldMaxTables = pRepo->config.maxTables;
tsdbTrace("vgId %d: tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
}
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, int32_t *size) {
// TODO: need to refactor this function
STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
uint32_t magic = 0;
char fname[256] = "\0";
struct stat fState;
char *spath = strdup(pRepo->rootDir);
char *prefixDir = dirname(spath);
if (name[0] == 0) {
// Map index to the file name
int fid = (*index) / 3;
if (fid > pFileH->numOfFGroups) {
// return meta data file
if ((*index) % 3 > 0) { // it is finished
tfree(spath);
return 0;
} else {
tsdbGetMetaFileName(pRepo->rootDir, fname);
}
} else {
// return data file name
strcpy(fname, pFileH->fGroup[fid].files[(*index) % 3].fname);
}
strcpy(name, fname + strlen(spath));
} else {
// Name is provided, need to get the file info
sprintf(fname, "%s/%s", prefixDir, name);
}
if (stat(fname, &fState) < 0) {
tfree(spath);
return 0;
}
*size = fState.st_size;
magic = *size;
return magic;
} }
\ No newline at end of file
...@@ -283,7 +283,10 @@ char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) { ...@@ -283,7 +283,10 @@ char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) {
} }
} }
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbCheckTableCfg(pCfg) < 0) return -1; if (tsdbCheckTableCfg(pCfg) < 0) return -1;
STable *super = NULL; STable *super = NULL;
...@@ -351,8 +354,14 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -351,8 +354,14 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
} }
// Register to meta // Register to meta
if (newSuper) tsdbAddTableToMeta(pMeta, super, true); if (newSuper) {
tsdbAddTableToMeta(pMeta, super, true);
tsdbTrace("vgId %d: super table is created! uid " PRId64, pRepo->config.tsdbId,
super->tableId.uid);
}
tsdbAddTableToMeta(pMeta, table, true); tsdbAddTableToMeta(pMeta, table, true);
tsdbTrace("vgId %d: table is created! tid %d, uid " PRId64, pRepo->config.tsdbId,
super->tableId.tid, super->tableId.uid);
// Write to meta file // Write to meta file
int bufLen = 0; int bufLen = 0;
...@@ -385,13 +394,24 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { ...@@ -385,13 +394,24 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
return pTable; return pTable;
} }
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { // int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pRepo == NULL) return -1;
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (pMeta == NULL) return -1; if (pMeta == NULL) return -1;
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid); STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid);
if (pTable == NULL) return -1; if (pTable == NULL) {
tsdbError("vgId %d: failed to drop table since table not exists! tid %d, uid " PRId64, pRepo->config.tsdbId,
tableId.tid, tableId.uid);
return -1;
}
tsdbTrace("vgId %d: table is dropped! tid %s, uid " PRId64, pRepo->config.tsdbId, tableId.tid, tableId.uid);
if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1; if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1;
return 0; return 0;
} }
......
...@@ -28,7 +28,7 @@ typedef struct { ...@@ -28,7 +28,7 @@ typedef struct {
int64_t uid; int64_t uid;
} SRecordInfo; } SRecordInfo;
static int32_t tsdbGetMetaFileName(char *rootDir, char *fname); // static int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
// static int32_t tsdbCheckMetaHeader(int fd); // static int32_t tsdbCheckMetaHeader(int fd);
static int32_t tsdbWriteMetaHeader(int fd); static int32_t tsdbWriteMetaHeader(int fd);
static int tsdbCreateMetaFile(char *fname); static int tsdbCreateMetaFile(char *fname);
...@@ -180,7 +180,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh) { ...@@ -180,7 +180,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh) {
tfree(mfh); tfree(mfh);
} }
static int32_t tsdbGetMetaFileName(char *rootDir, char *fname) { int32_t tsdbGetMetaFileName(char *rootDir, char *fname) {
if (rootDir == NULL) return -1; if (rootDir == NULL) return -1;
sprintf(fname, "%s/%s", rootDir, TSDB_META_FILE_NAME); sprintf(fname, "%s/%s", rootDir, TSDB_META_FILE_NAME);
return 0; return 0;
......
...@@ -383,9 +383,8 @@ static int vnodeWalCallback(void *arg) { ...@@ -383,9 +383,8 @@ static int vnodeWalCallback(void *arg) {
} }
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) {
// SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
//tsdbGetFileInfo(pVnode->tsdb, name, index, size); return tsdbGetFileInfo(pVnode->tsdb, name, index, size);
return 0;
} }
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册