未验证 提交 cb1602f7 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1385 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -51,19 +51,21 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes); ...@@ -51,19 +51,21 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes);
// ----------------- TSDB SCHEMA DEFINITION // ----------------- TSDB SCHEMA DEFINITION
typedef struct { typedef struct {
int numOfCols; // Number of columns appended int numOfCols; // Number of columns appended
int totalCols; // Total columns allocated int padding; // Total columns allocated
STColumn columns[]; STColumn columns[];
} STSchema; } STSchema;
#define schemaNCols(s) ((s)->numOfCols) #define schemaNCols(s) ((s)->numOfCols)
#define schemaTCols(s) ((s)->totalCols)
#define schemaColAt(s, i) ((s)->columns + i) #define schemaColAt(s, i) ((s)->columns + i)
STSchema *tdNewSchema(int32_t nCols); STSchema *tdNewSchema(int32_t nCols);
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes); int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdDupSchema(STSchema *pSchema); STSchema *tdDupSchema(STSchema *pSchema);
void tdFreeSchema(STSchema *pSchema); void tdFreeSchema(STSchema *pSchema);
void tdUpdateSchema(STSchema *pSchema); void tdUpdateSchema(STSchema *pSchema);
int tdGetSchemaEncodeSize(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc);
// ----------------- Data row structure // ----------------- Data row structure
...@@ -99,33 +101,6 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol); ...@@ -99,33 +101,6 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol);
void tdDataRowReset(SDataRow row, STSchema *pSchema); void tdDataRowReset(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
/* Data rows definition, the format of it is like below:
* +---------+-----------------------+--------+-----------------------+
* | int32_t | | | |
* +---------+-----------------------+--------+-----------------------+
* | len | SDataRow | .... | SDataRow |
* +---------+-----------------------+--------+-----------------------+
*/
typedef void *SDataRows;
#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t)
#define dataRowsLen(rs) (*(int32_t *)(rs))
#define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l))
#define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t))
void tdDataRowsAppendRow(SDataRows rows, SDataRow row);
// Data rows iterator
typedef struct {
int32_t totalLen;
int32_t len;
SDataRow row;
} SDataRowsIter;
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter);
SDataRow tdDataRowsNext(SDataRowsIter *pIter);
/* Data column definition /* Data column definition
* +---------+---------+-----------------------+ * +---------+---------+-----------------------+
* | int32_t | int32_t | | * | int32_t | int32_t | |
......
...@@ -91,10 +91,9 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) { ...@@ -91,10 +91,9 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) {
STSchema *tdNewSchema(int32_t nCols) { STSchema *tdNewSchema(int32_t nCols) {
int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols; int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
STSchema *pSchema = (STSchema *)malloc(size); STSchema *pSchema = (STSchema *)calloc(1, size);
if (pSchema == NULL) return NULL; if (pSchema == NULL) return NULL;
pSchema->numOfCols = 0; pSchema->numOfCols = 0;
pSchema->totalCols = nCols;
return pSchema; return pSchema;
} }
...@@ -102,8 +101,8 @@ STSchema *tdNewSchema(int32_t nCols) { ...@@ -102,8 +101,8 @@ STSchema *tdNewSchema(int32_t nCols) {
/** /**
* Append a column to the schema * Append a column to the schema
*/ */
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes) { int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
if (pSchema->numOfCols >= pSchema->totalCols) return -1; // if (pSchema->numOfCols >= pSchema->totalCols) return -1;
if (!isValidDataType(type, 0)) return -1; if (!isValidDataType(type, 0)) return -1;
STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema)); STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
...@@ -159,6 +158,53 @@ void tdUpdateSchema(STSchema *pSchema) { ...@@ -159,6 +158,53 @@ void tdUpdateSchema(STSchema *pSchema) {
} }
} }
/**
* Return the size of encoded schema
*/
int tdGetSchemaEncodeSize(STSchema *pSchema) {
return sizeof(STSchema) + schemaNCols(pSchema) * (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) +
T_MEMBER_SIZE(STColumn, bytes));
}
/**
* Encode a schema to dst, and return the next pointer
*/
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i);
T_APPEND_MEMBER(dst, pCol, STColumn, type);
T_APPEND_MEMBER(dst, pCol, STColumn, colId);
T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
}
return dst;
}
/**
* Decode a schema from a binary.
*/
STSchema *tdDecodeSchema(void **psrc) {
int numOfCols = 0;
T_READ_MEMBER(*psrc, int, numOfCols);
STSchema *pSchema = tdNewSchema(numOfCols);
if (pSchema == NULL) return NULL;
for (int i = 0; i < numOfCols; i++) {
int8_t type = 0;
int16_t colId = 0;
int32_t bytes = 0;
T_READ_MEMBER(*psrc, int8_t, type);
T_READ_MEMBER(*psrc, int16_t, colId);
T_READ_MEMBER(*psrc, int32_t, bytes);
tdSchemaAppendCol(pSchema, type, colId, bytes);
}
return pSchema;
}
/** /**
* Initialize a data row * Initialize a data row
*/ */
...@@ -234,6 +280,8 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) { ...@@ -234,6 +280,8 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) {
dataRowFLen(row) += TYPE_BYTES[colType(pCol)]; dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
break; break;
} }
return 0;
} }
void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); } void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); }
...@@ -246,40 +294,6 @@ SDataRow tdDataRowDup(SDataRow row) { ...@@ -246,40 +294,6 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow; return trow;
} }
void tdDataRowsAppendRow(SDataRows rows, SDataRow row) {
dataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row));
}
// Initialize the iterator
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter) {
if (pIter == NULL) return;
pIter->totalLen = dataRowsLen(rows);
if (pIter->totalLen == TD_DATA_ROWS_HEAD_LEN) {
pIter->row = NULL;
return;
}
pIter->row = (SDataRow)((char *)rows + TD_DATA_ROWS_HEAD_LEN);
pIter->len = TD_DATA_ROWS_HEAD_LEN + dataRowLen(pIter->row);
}
// Get the next row in Rows
SDataRow tdDataRowsNext(SDataRowsIter *pIter) {
SDataRow row = pIter->row;
if (row == NULL) return NULL;
if (pIter->len >= pIter->totalLen) {
pIter->row = NULL;
} else {
pIter->row = (char *)row + dataRowLen(row);
pIter->len += dataRowLen(row);
}
return row;
}
/** /**
* Return the first part length of a data row for a schema * Return the first part length of a data row for a schema
*/ */
......
...@@ -82,6 +82,17 @@ extern const int32_t TYPE_BYTES[11]; ...@@ -82,6 +82,17 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_TIME_PRECISION_MILLI_STR "ms" #define TSDB_TIME_PRECISION_MILLI_STR "ms"
#define TSDB_TIME_PRECISION_MICRO_STR "us" #define TSDB_TIME_PRECISION_MICRO_STR "us"
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
#define T_APPEND_MEMBER(dst, ptr, type, member) \
do {\
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\
} while(0)
#define T_READ_MEMBER(src, type, target) \
do { \
(target) = *(type *)(src); \
(src) = (void *)((char *)src + sizeof(type));\
} while(0)
#define TSDB_KEYSIZE sizeof(TSKEY) #define TSDB_KEYSIZE sizeof(TSKEY)
......
...@@ -35,7 +35,7 @@ extern "C" { ...@@ -35,7 +35,7 @@ extern "C" {
// ---------- TSDB TABLE DEFINITION // ---------- TSDB TABLE DEFINITION
typedef struct STable { typedef struct STable {
TSDB_TABLE_TYPE type; int8_t type;
STableId tableId; STableId tableId;
int32_t superUid; // Super table UID int32_t superUid; // Super table UID
int32_t sversion; int32_t sversion;
......
...@@ -25,15 +25,21 @@ extern "C" { ...@@ -25,15 +25,21 @@ extern "C" {
#define TSDB_META_FILE_NAME "META" #define TSDB_META_FILE_NAME "META"
#define TSDB_META_HASH_FRACTION 1.1 #define TSDB_META_HASH_FRACTION 1.1
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct { typedef struct {
int fd; // File descriptor int fd; // File descriptor
int nDel; // number of deletions int nDel; // number of deletions
int nRecord; // Number of records int tombSize; // deleted size
int64_t size; // Total file size int64_t size; // Total file size
void * map; // Map from uid ==> position void * map; // Map from uid ==> position
iterFunc iFunc;
afterFunc aFunc;
void * appH;
} SMetaFile; } SMetaFile;
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables); SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH);
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen); int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid); int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid);
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen); int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
......
...@@ -77,8 +77,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); ...@@ -77,8 +77,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir); static int tsdbOpenMetaFile(char *tsdbDir);
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...@@ -219,25 +219,25 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { ...@@ -219,25 +219,25 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return NULL; return NULL;
} }
int fd = tsdbOpenMetaFile(tsdbDir); pRepo->rootDir = strdup(tsdbDir);
if (fd < 0) {
free(pRepo);
return NULL;
}
if (tsdbRecoverRepo(fd, &(pRepo->config)) < 0) { tsdbRestoreCfg(pRepo, &(pRepo->config));
close(fd);
pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables);
if (pRepo->tsdbMeta == NULL) {
free(pRepo->rootDir);
free(pRepo); free(pRepo);
return NULL; return NULL;
} }
pRepo->tsdbCache = tsdbInitCache(5); pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize);
if (pRepo->tsdbCache == NULL) { if (pRepo->tsdbCache == NULL) {
// TODO: deal with error tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo->rootDir);
free(pRepo);
return NULL; return NULL;
} }
pRepo->rootDir = strdup(tsdbDir);
pRepo->state = TSDB_REPO_STATE_ACTIVE; pRepo->state = TSDB_REPO_STATE_ACTIVE;
return (tsdb_repo_t *)pRepo; return (tsdb_repo_t *)pRepo;
...@@ -459,7 +459,7 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { ...@@ -459,7 +459,7 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
if (pIter->len >= pIter->totalLen) { if (pIter->len >= pIter->totalLen) {
pIter->pBlock = NULL; pIter->pBlock = NULL;
} else { } else {
pIter->pBlock = (char *)pBlock + pBlock->len + sizeof(SSubmitBlk); pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk));
} }
return pBlock; return pBlock;
...@@ -623,12 +623,6 @@ static int tsdbOpenMetaFile(char *tsdbDir) { ...@@ -623,12 +623,6 @@ static int tsdbOpenMetaFile(char *tsdbDir) {
return 0; return 0;
} }
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) {
// TODO: read tsdb configuration from file
// recover tsdb meta
return 0;
}
static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) { static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
// TODO // TODO
int32_t level = 0; int32_t level = 0;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
static int tsdbFreeTable(STable *pTable); static int tsdbFreeTable(STable *pTable);
static int32_t tsdbCheckTableCfg(STableCfg *pCfg); static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
...@@ -39,8 +39,21 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) { ...@@ -39,8 +39,21 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) {
void *ret = malloc(*contLen); void *ret = malloc(*contLen);
if (ret == NULL) return NULL; if (ret == NULL) return NULL;
// TODO: encode the object to the memory void *ptr = ret;
{} T_APPEND_MEMBER(ptr, pTable, STable, type);
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
T_APPEND_MEMBER(ptr, pTable, STable, superUid);
T_APPEND_MEMBER(ptr, pTable, STable, sversion);
if (pTable->type == TSDB_SUPER_TABLE) {
ptr = tdEncodeSchema(ptr, pTable->schema);
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
} else if (pTable->type == TSDB_CHILD_TABLE) {
dataRowCpy(ptr, pTable->tagVal);
} else {
ptr = tdEncodeSchema(ptr, pTable->schema);
}
return ret; return ret;
} }
...@@ -59,8 +72,20 @@ STable *tsdbDecodeTable(void *cont, int contLen) { ...@@ -59,8 +72,20 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
STable *pTable = (STable *)calloc(1, sizeof(STable)); STable *pTable = (STable *)calloc(1, sizeof(STable));
if (pTable == NULL) return NULL; if (pTable == NULL) return NULL;
{ void *ptr = cont;
// TODO recover from the binary content T_READ_MEMBER(ptr, int8_t, pTable->type);
T_READ_MEMBER(ptr, int64_t, pTable->tableId.uid);
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
T_READ_MEMBER(ptr, int32_t, pTable->superUid);
T_READ_MEMBER(ptr, int32_t, pTable->sversion);
if (pTable->type == TSDB_SUPER_TABLE) {
pTable->schema = tdDecodeSchema(&ptr);
pTable->tagSchema = tdDecodeSchema(&ptr);
} else if (pTable->type == TSDB_CHILD_TABLE) {
pTable->tagVal = tdDataRowDup(ptr);
} else {
pTable->schema = tdDecodeSchema(&ptr);
} }
return pTable; return pTable;
...@@ -70,6 +95,36 @@ void *tsdbFreeEncode(void *cont) { ...@@ -70,6 +95,36 @@ void *tsdbFreeEncode(void *cont) {
if (cont != NULL) free(cont); if (cont != NULL) free(cont);
} }
int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
STable *pTable = tsdbDecodeTable(cont, contLen);
if (pTable == NULL) return -1;
if (pTable->type == TSDB_SUPER_TABLE) {
pTable->content.pIndex =
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey);
} else {
pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
}
tsdbAddTableToMeta(pMeta, pTable, false);
return 0;
}
void tsdbOrgMeta(void *pHandle) {
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
tsdbAddTableIntoIndex(pMeta, pTable);
}
}
}
/** /**
* Initialize the meta handle * Initialize the meta handle
* ASSUMPTIONS: VALID PARAMETER * ASSUMPTIONS: VALID PARAMETER
...@@ -94,7 +149,7 @@ STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) { ...@@ -94,7 +149,7 @@ STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) {
return NULL; return NULL;
} }
pMeta->mfh = tsdbInitMetaFile(rootDir, maxTables); pMeta->mfh = tsdbInitMetaFile(rootDir, maxTables, tsdbRestoreTable, tsdbOrgMeta, pMeta);
if (pMeta->mfh == NULL) { if (pMeta->mfh == NULL) {
taosHashCleanup(pMeta->map); taosHashCleanup(pMeta->map);
free(pMeta->tables); free(pMeta->tables);
...@@ -186,8 +241,21 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -186,8 +241,21 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
} }
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
if (newSuper) tsdbAddTableToMeta(pMeta, super); // Register to meta
tsdbAddTableToMeta(pMeta, table); if (newSuper) tsdbAddTableToMeta(pMeta, super, true);
tsdbAddTableToMeta(pMeta, table, true);
// Write to meta file
int bufLen = 0;
if (newSuper) {
void *buf = tsdbEncodeTable(super, &bufLen);
tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen);
tsdbFreeEncode(buf);
}
void *buf = tsdbEncodeTable(table, &bufLen);
tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen);
tsdbFreeEncode(buf);
return 0; return 0;
} }
...@@ -268,7 +336,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) { ...@@ -268,7 +336,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
return *(STable **)ptr; return *(STable **)ptr;
} }
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
// add super table to the linked list // add super table to the linked list
if (pMeta->superList == NULL) { if (pMeta->superList == NULL) {
...@@ -318,8 +386,22 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -318,8 +386,22 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
} }
static int tsdbEstimateTableEncodeSize(STable *pTable) { static int tsdbEstimateTableEncodeSize(STable *pTable) {
// TODO int size = 0;
return 0; size += T_MEMBER_SIZE(STable, type);
size += T_MEMBER_SIZE(STable, tableId);
size += T_MEMBER_SIZE(STable, superUid);
size += T_MEMBER_SIZE(STable, sversion);
if (pTable->type == TSDB_SUPER_TABLE) {
size += tdGetSchemaEncodeSize(pTable->schema);
size += tdGetSchemaEncodeSize(pTable->tagSchema);
} else if (pTable->type == TSDB_CHILD_TABLE) {
size += dataRowLen(pTable->tagVal);
} else {
size += tdGetSchemaEncodeSize(pTable->schema);
}
return size;
} }
static char *getTupleKey(const void * data) { static char *getTupleKey(const void * data) {
......
...@@ -19,11 +19,14 @@ ...@@ -19,11 +19,14 @@
#include "hash.h" #include "hash.h"
#include "tsdbMetaFile.h" #include "tsdbMetaFile.h"
#define TSDB_META_FILE_VERSION_MAJOR 1
#define TSDB_META_FILE_VERSION_MINOR 0
#define TSDB_META_FILE_HEADER_SIZE 512 #define TSDB_META_FILE_HEADER_SIZE 512
typedef struct { typedef struct {
int32_t offset; int32_t offset;
int32_t size; int32_t size;
int64_t uid;
} SRecordInfo; } SRecordInfo;
static int32_t tsdbGetMetaFileName(char *rootDir, char *fname); static int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
...@@ -32,14 +35,20 @@ static int32_t tsdbWriteMetaHeader(int fd); ...@@ -32,14 +35,20 @@ static int32_t tsdbWriteMetaHeader(int fd);
static int tsdbCreateMetaFile(char *fname); static int tsdbCreateMetaFile(char *fname);
static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh); static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh);
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) { SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH) {
// TODO
char fname[128] = "\0"; char fname[128] = "\0";
if (tsdbGetMetaFileName(rootDir, fname) < 0) return NULL; if (tsdbGetMetaFileName(rootDir, fname) < 0) return NULL;
SMetaFile *mfh = (SMetaFile *)calloc(1, sizeof(SMetaFile)); SMetaFile *mfh = (SMetaFile *)calloc(1, sizeof(SMetaFile));
if (mfh == NULL) return NULL; if (mfh == NULL) return NULL;
mfh->iFunc = iFunc;
mfh->aFunc = aFunc;
mfh->appH = appH;
mfh->nDel = 0;
mfh->tombSize = 0;
mfh->size = 0;
// OPEN MAP // OPEN MAP
mfh->map = mfh->map =
taosHashInit(maxTables * TSDB_META_HASH_FRACTION, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); taosHashInit(maxTables * TSDB_META_HASH_FRACTION, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
...@@ -56,6 +65,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) { ...@@ -56,6 +65,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) {
free(mfh); free(mfh);
return NULL; return NULL;
} }
mfh->size += TSDB_META_FILE_HEADER_SIZE;
} else { // file exists, recover from file } else { // file exists, recover from file
if (tsdbRestoreFromMetaFile(fname, mfh) < 0) { if (tsdbRestoreFromMetaFile(fname, mfh) < 0) {
taosHashCleanup(mfh->map); taosHashCleanup(mfh->map);
...@@ -74,7 +84,8 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co ...@@ -74,7 +84,8 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
SRecordInfo info; SRecordInfo info;
info.offset = mfh->size; info.offset = mfh->size;
info.size = contLen; // TODO: Here is not correct info.size = contLen;
info.uid = uid;
mfh->size += (contLen + sizeof(SRecordInfo)); mfh->size += (contLen + sizeof(SRecordInfo));
...@@ -83,7 +94,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co ...@@ -83,7 +94,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
} }
// TODO: make below a function to implement // TODO: make below a function to implement
if (lseek(mfh->fd, info.offset, SEEK_CUR) < 0) { if (lseek(mfh->fd, info.offset, SEEK_SET) < 0) {
return -1; return -1;
} }
...@@ -97,7 +108,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co ...@@ -97,7 +108,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
fsync(mfh->fd); fsync(mfh->fd);
mfh->nRecord++; mfh->tombSize++;
return 0; return 0;
} }
...@@ -182,6 +193,15 @@ static int32_t tsdbCheckMetaHeader(int fd) { ...@@ -182,6 +193,15 @@ static int32_t tsdbCheckMetaHeader(int fd) {
static int32_t tsdbWriteMetaHeader(int fd) { static int32_t tsdbWriteMetaHeader(int fd) {
// TODO: write the meta file header to file // TODO: write the meta file header to file
char head[TSDB_META_FILE_HEADER_SIZE] = "\0";
sprintf(head, "version: %d.%d", TSDB_META_FILE_VERSION_MAJOR, TSDB_META_FILE_VERSION_MINOR);
write(fd, (void *)head, TSDB_META_FILE_HEADER_SIZE);
return 0;
}
static int32_t tsdbReadMetaHeader(int fd) {
lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET);
return 0; return 0;
} }
...@@ -218,8 +238,44 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) { ...@@ -218,8 +238,44 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
return -1; return -1;
} }
mfh->size += TSDB_META_FILE_HEADER_SIZE;
mfh->fd = fd; mfh->fd = fd;
// TODO: iterate to read the meta file to restore the meta data
void *buf = NULL;
int buf_size = 0;
SRecordInfo info;
while (1) {
if (read(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) == 0) break;
if (info.offset < 0) {
mfh->size += (info.size + sizeof(SRecordInfo));
mfh->tombSize += (info.size + sizeof(SRecordInfo));
lseek(mfh->fd, info.size, SEEK_CUR);
mfh->size = mfh->size + sizeof(SRecordInfo) + info.size;
mfh->tombSize = mfh->tombSize + sizeof(SRecordInfo) + info.size;
} else {
if (taosHashPut(mfh->map, (char *)(&info.uid), sizeof(info.uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
if (buf) free(buf);
return -1;
}
buf = realloc(buf, info.size);
if (buf == NULL) return -1;
if (read(mfh->fd, buf, info.size) < 0) {
if (buf) free(buf);
return -1;
}
(*mfh->iFunc)(mfh->appH, buf, info.size);
mfh->size = mfh->size + sizeof(SRecordInfo) + info.size;
}
}
(*mfh->aFunc)(mfh->appH);
if (buf) free(buf);
return 0; return 0;
} }
\ No newline at end of file
...@@ -3,6 +3,44 @@ ...@@ -3,6 +3,44 @@
#include "tsdb.h" #include "tsdb.h"
#include "dataformat.h" #include "dataformat.h"
#include "tsdbMeta.h"
TEST(TsdbTest, tableEncodeDecode) {
STable *pTable = (STable *)malloc(sizeof(STable));
pTable->type = TSDB_NORMAL_TABLE;
pTable->tableId.uid = 987607499877672L;
pTable->tableId.tid = 0;
pTable->superUid = -1;
pTable->sversion = 0;
pTable->tagSchema = NULL;
pTable->tagVal = NULL;
int nCols = 5;
STSchema *schema = tdNewSchema(nCols);
for (int i = 0; i < nCols; i++) {
if (i == 0) {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
} else {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1);
}
}
pTable->schema = schema;
int bufLen = 0;
void *buf = tsdbEncodeTable(pTable, &bufLen);
STable *tTable = tsdbDecodeTable(buf, bufLen);
ASSERT_EQ(pTable->type, tTable->type);
ASSERT_EQ(pTable->tableId.uid, tTable->tableId.uid);
ASSERT_EQ(pTable->tableId.tid, tTable->tableId.tid);
ASSERT_EQ(pTable->superUid, tTable->superUid);
ASSERT_EQ(pTable->sversion, tTable->sversion);
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
ASSERT_EQ(tTable->content.pData, nullptr);
}
TEST(TsdbTest, createRepo) { TEST(TsdbTest, createRepo) {
STsdbCfg config; STsdbCfg config;
...@@ -65,3 +103,7 @@ TEST(TsdbTest, createRepo) { ...@@ -65,3 +103,7 @@ TEST(TsdbTest, createRepo) {
int k = 0; int k = 0;
} }
TEST(TsdbTest, openRepo) {
tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0");
ASSERT_NE(pRepo, nullptr);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册