未验证 提交 71c7a72a 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1845 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -95,6 +95,7 @@ typedef struct STable { ...@@ -95,6 +95,7 @@ typedef struct STable {
void * streamHandler; // TODO void * streamHandler; // TODO
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
struct STable *next; // TODO: remove the next struct STable *next; // TODO: remove the next
struct STable *prev;
} STable; } STable;
#define TSDB_GET_TABLE_LAST_KEY(pTable) ((pTable)->lastKey) #define TSDB_GET_TABLE_LAST_KEY(pTable) ((pTable)->lastKey)
......
...@@ -90,6 +90,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg) { ...@@ -90,6 +90,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) { int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) {
if (mkdir(rootDir, 0755) != 0) { if (mkdir(rootDir, 0755) != 0) {
tsdbError("id %d: failed to create rootDir! rootDir %s, reason %s", pCfg->tsdbId, rootDir, strerror(errno));
if (errno == EACCES) { if (errno == EACCES) {
return TSDB_CODE_NO_DISK_PERMISSIONS; return TSDB_CODE_NO_DISK_PERMISSIONS;
} else if (errno == ENOSPC) { } else if (errno == ENOSPC) {
...@@ -611,14 +612,20 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -611,14 +612,20 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
if (pCfg->precision == -1) { if (pCfg->precision == -1) {
pCfg->precision = TSDB_DEFAULT_PRECISION; pCfg->precision = TSDB_DEFAULT_PRECISION;
} else { } else {
if (!IS_VALID_PRECISION(pCfg->precision)) return -1; if (!IS_VALID_PRECISION(pCfg->precision)) {
tsdbError("id %d: invalid precision configuration! precision %d", pCfg->tsdbId, pCfg->precision);
return -1;
}
} }
// Check compression // Check compression
if (pCfg->compression == -1) { if (pCfg->compression == -1) {
pCfg->compression = TSDB_DEFAULT_COMPRESSION; pCfg->compression = TSDB_DEFAULT_COMPRESSION;
} else { } else {
if (!IS_VALID_COMPRESSION(pCfg->compression)) return -1; if (!IS_VALID_COMPRESSION(pCfg->compression)) {
tsdbError("id %d: invalid compression configuration! compression %d", pCfg->tsdbId, pCfg->precision);
return -1;
}
} }
// Check tsdbId // Check tsdbId
...@@ -628,30 +635,50 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -628,30 +635,50 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
if (pCfg->maxTables == -1) { if (pCfg->maxTables == -1) {
pCfg->maxTables = TSDB_DEFAULT_TABLES; pCfg->maxTables = TSDB_DEFAULT_TABLES;
} else { } else {
if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1; if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) {
tsdbError("id %d: invalid maxTables configuration! maxTables %d TSDB_MIN_TABLES %d TSDB_MAX_TABLES %d",
pCfg->tsdbId, pCfg->maxTables, TSDB_MIN_TABLES, TSDB_MAX_TABLES);
return -1;
}
} }
// Check daysPerFile // Check daysPerFile
if (pCfg->daysPerFile == -1) { if (pCfg->daysPerFile == -1) {
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
} else { } else {
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1; if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) {
tsdbError(
"id %d: invalid daysPerFile configuration! daysPerFile %d TSDB_MIN_DAYS_PER_FILE %d TSDB_MAX_DAYS_PER_FILE "
"%d",
pCfg->tsdbId, pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
return -1;
}
} }
// Check minRowsPerFileBlock and maxRowsPerFileBlock // Check minRowsPerFileBlock and maxRowsPerFileBlock
if (pCfg->minRowsPerFileBlock == -1) { if (pCfg->minRowsPerFileBlock == -1) {
pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK; pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
} else { } else {
if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) {
tsdbError(
"id %d: invalid minRowsPerFileBlock configuration! minRowsPerFileBlock %d TSDB_MIN_MIN_ROW_FBLOCK %d "
"TSDB_MAX_MIN_ROW_FBLOCK %d",
pCfg->tsdbId, pCfg->minRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
return -1; return -1;
} }
}
if (pCfg->maxRowsPerFileBlock == -1) { if (pCfg->maxRowsPerFileBlock == -1) {
pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK; pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
} else { } else {
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) {
tsdbError(
"id %d: invalid maxRowsPerFileBlock configuration! maxRowsPerFileBlock %d TSDB_MIN_MAX_ROW_FBLOCK %d "
"TSDB_MAX_MAX_ROW_FBLOCK %d",
pCfg->tsdbId, pCfg->maxRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
return -1; return -1;
} }
}
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) return -1; if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) return -1;
...@@ -659,7 +686,13 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { ...@@ -659,7 +686,13 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
if (pCfg->keep == -1) { if (pCfg->keep == -1) {
pCfg->keep = TSDB_DEFAULT_KEEP; pCfg->keep = TSDB_DEFAULT_KEEP;
} else { } else {
if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) return -1; if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) {
tsdbError(
"id %d: invalid keep configuration! keep %d TSDB_MIN_KEEP %d "
"TSDB_MAX_KEEP %d",
pCfg->tsdbId, pCfg->keep, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
return -1;
}
} }
return 0; return 0;
...@@ -716,15 +749,22 @@ static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname) { ...@@ -716,15 +749,22 @@ static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname) {
} }
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) { static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
STsdbCfg *pCfg = &pRepo->config;
if (tsdbSaveConfig(pRepo) < 0) return -1; if (tsdbSaveConfig(pRepo) < 0) return -1;
char dirName[128] = "\0"; char dirName[128] = "\0";
if (tsdbGetDataDirName(pRepo, dirName) < 0) return -1; if (tsdbGetDataDirName(pRepo, dirName) < 0) return -1;
if (mkdir(dirName, 0755) < 0) { if (mkdir(dirName, 0755) < 0) {
tsdbError("id %d: failed to create repository directory! reason %s", pRepo->config.tsdbId, strerror(errno));
return -1; return -1;
} }
tsdbError(
"id %d: set up tsdb environment succeed! cacheBlockSize %d, totalBlocks %d, maxTables %d, daysPerFile %d, keep "
"%d, minRowsPerFileBlock %d, maxRowsPerFileBlock %d, precision %d, compression%d",
pRepo->config.tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep,
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
return 0; return 0;
} }
...@@ -811,7 +851,8 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY ...@@ -811,7 +851,8 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
if (pTable == NULL) { if (pTable == NULL) {
tsdbError("failed to get table for insert, uid:%" PRIu64 ", tid:%d", tableId.uid, tableId.tid); tsdbError("id %d: failed to get table for insert, uid:%" PRIu64 ", tid:%d", pRepo->config.tsdbId, pBlock->uid,
pBlock->tid);
return TSDB_CODE_INVALID_TABLE_ID; return TSDB_CODE_INVALID_TABLE_ID;
} }
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
static int tsdbFreeTable(STable *pTable); static int tsdbFreeTable(STable *pTable);
static int32_t tsdbCheckTableCfg(STableCfg *pCfg); static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx); static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbEstimateTableEncodeSize(STable *pTable); static int tsdbEstimateTableEncodeSize(STable *pTable);
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable);
/** /**
* Encode a TSDB table object as a binary content * Encode a TSDB table object as a binary content
...@@ -375,21 +375,9 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { ...@@ -375,21 +375,9 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid); STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid);
if (pTable == NULL) return -1; if (pTable == NULL) return -1;
if (pTable->type == TSDB_SUPER_TABLE) { if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1;
// TODO: implement drop super table
return -1;
} else {
pMeta->tables[pTable->tableId.tid] = NULL;
pMeta->nTables--;
assert(pMeta->nTables >= 0);
if (pTable->type == TSDB_CHILD_TABLE) {
tsdbRemoveTableFromIndex(pMeta, pTable);
}
tsdbFreeTable(pTable);
}
return 0; return 0;
} }
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) { // int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
...@@ -445,10 +433,12 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -445,10 +433,12 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
if (pMeta->superList == NULL) { if (pMeta->superList == NULL) {
pMeta->superList = pTable; pMeta->superList = pTable;
pTable->next = NULL; pTable->next = NULL;
pTable->prev = NULL;
} else { } else {
STable *pTemp = pMeta->superList; pTable->next = pMeta->superList;
pTable->prev = NULL;
pTable->next->prev = pTable;
pMeta->superList = pTable; pMeta->superList = pTable;
pTable->next = pTemp;
} }
} else { } else {
// add non-super table to the array // add non-super table to the array
...@@ -467,22 +457,50 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { ...@@ -467,22 +457,50 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes; if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
} }
return tsdbAddTableIntoMap(pMeta, pTable); if (taosHashPut(pMeta->map, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable), sizeof(pTable)) < 0) {
return -1;
}
return 0;
} }
// static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
// // TODO if (pTable->type == TSDB_SUPER_TABLE) {
// return 0; SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
// } while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE);
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { pMeta->tables[tTable->tableId.tid] = NULL;
// TODO: add the table to the map taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid));
int64_t uid = pTable->tableId.uid; pMeta->nTables--;
if (taosHashPut(pMeta->map, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) { tsdbFreeTable(tTable);
return -1; }
tSkipListDestroyIter(pIter);
// TODO: Remove the table from the list
if (pTable->prev != NULL) {
pTable->prev->next = pTable->next;
if (pTable->next != NULL) {
pTable->next->prev = pTable->prev;
}
} else {
pMeta->superList = pTable->next;
}
} else {
pMeta->tables[pTable->tableId.tid] = NULL;
if (pTable->type == TSDB_CHILD_TABLE) {
tsdbRemoveTableFromIndex(pMeta, pTable);
} }
pMeta->nTables--;
}
tsdbFreeTable(pTable);
taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid));
return 0; return 0;
} }
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL); assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid); STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册