提交 35b52a5f 编写于 作者: H hzcheng

TD-34

上级 c0578258
...@@ -81,11 +81,13 @@ STSchema *tdDecodeSchema(void **psrc); ...@@ -81,11 +81,13 @@ STSchema *tdDecodeSchema(void **psrc);
*/ */
typedef void *SDataRow; typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t)) #define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
#define dataRowLen(r) (*(int32_t *)(r)) #define dataRowLen(r) (*(int32_t *)(r))
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t))) #define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE) #define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) #define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l)) #define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
#define dataRowIdx(r, i) ((char *)(r) + i) #define dataRowIdx(r, i) ((char *)(r) + i)
......
...@@ -33,20 +33,25 @@ extern "C" { ...@@ -33,20 +33,25 @@ extern "C" {
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL) #define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfPoints;
void * pData;
} SMemTable;
// ---------- TSDB TABLE DEFINITION // ---------- TSDB TABLE DEFINITION
typedef struct STable { typedef struct STable {
int8_t type; int8_t type;
STableId tableId; STableId tableId;
int32_t superUid; // Super table UID int32_t superUid; // Super table UID
int32_t sversion; int32_t sversion;
STSchema *schema; STSchema * schema;
STSchema *tagSchema; STSchema * tagSchema;
SDataRow tagVal; SDataRow tagVal;
union { SMemTable * mem;
void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data SMemTable * imem;
void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
} content;
void * iData; // Skiplist to commit
void * eventHandler; // TODO void * eventHandler; // TODO
void * streamHandler; // TODO void * streamHandler; // TODO
struct STable *next; // TODO: remove the next struct STable *next; // TODO: remove the next
...@@ -94,8 +99,9 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta); ...@@ -94,8 +99,9 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg); int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId); int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable); // int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid); STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
char *getTupleKey(const void * data);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -311,10 +311,9 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { ...@@ -311,10 +311,9 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
// Loop to move pData to iData // Loop to move pData to iData
for (int i = 0; i < pRepo->config.maxTables; i++) { for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pRepo->tsdbMeta->tables[i]; STable *pTable = pRepo->tsdbMeta->tables[i];
if (pTable != NULL) { if (pTable != NULL && pTable->mem != NULL) {
void *pData = pTable->content.pData; pTable->imem = pTable->mem;
pTable->content.pData = NULL; pTable->mem = NULL;
pTable->iData = pData;
} }
} }
// Loop to move mem to imem // Loop to move mem to imem
...@@ -669,7 +668,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -669,7 +668,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
int32_t level = 0; int32_t level = 0;
int32_t headSize = 0; int32_t headSize = 0;
tSkipListRandNodeInfo(pTable->content.pData, &level, &headSize); if (pTable->mem == NULL) {
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
if (pTable->mem == NULL) return -1;
pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
}
tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize);
// Copy row into the memory // Copy row into the memory
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row)); SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row));
...@@ -681,7 +686,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -681,7 +686,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
dataRowCpy(SL_GET_NODE_DATA(pNode), row); dataRowCpy(SL_GET_NODE_DATA(pNode), row);
// Insert the skiplist node into the data // Insert the skiplist node into the data
tsdbInsertRowToTableImpl(pNode, pTable); tSkipListPut(pTable->mem->pData, pNode);
return 0; return 0;
} }
...@@ -712,7 +717,7 @@ static void *tsdbCommitToFile(void *arg) { ...@@ -712,7 +717,7 @@ static void *tsdbCommitToFile(void *arg) {
for (int i = 0; i < pRepo->config.maxTables; i++) { for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue; if (pTable == NULL) continue;
SSkipListIterator *pIter = tSkipListCreateIter(pTable->iData); SSkipListIterator *pIter = tSkipListCreateIter(pTable->imem->pData);
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
SSkipListNode *node = tSkipListIterGet(pIter); SSkipListNode *node = tSkipListIterGet(pIter);
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
......
...@@ -18,7 +18,6 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable); ...@@ -18,7 +18,6 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbEstimateTableEncodeSize(STable *pTable); static int tsdbEstimateTableEncodeSize(STable *pTable);
static char * getTupleKey(const void *data);
/** /**
* Encode a TSDB table object as a binary content * Encode a TSDB table object as a binary content
...@@ -102,12 +101,9 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { ...@@ -102,12 +101,9 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
if (pTable == NULL) return -1; if (pTable == NULL) return -1;
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
pTable->content.pIndex = pTable->pIndex =
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey); tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey);
} else { }
pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
}
tsdbAddTableToMeta(pMeta, pTable, false); tsdbAddTableToMeta(pMeta, pTable, false);
...@@ -208,10 +204,10 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -208,10 +204,10 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
super->schema = tdDupSchema(pCfg->schema); super->schema = tdDupSchema(pCfg->schema);
super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagSchema = tdDupSchema(pCfg->tagSchema);
super->tagVal = tdDataRowDup(pCfg->tagValues); super->tagVal = tdDataRowDup(pCfg->tagValues);
super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
0, getTupleKey); // Allow duplicate key, no lock 0, getTupleKey); // Allow duplicate key, no lock
if (super->content.pIndex == NULL) { if (super->pIndex == NULL) {
tdFreeSchema(super->schema); tdFreeSchema(super->schema);
tdFreeSchema(super->tagSchema); tdFreeSchema(super->tagSchema);
tdFreeDataRow(super->tagVal); tdFreeDataRow(super->tagVal);
...@@ -223,7 +219,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -223,7 +219,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
} }
} }
STable *table = (STable *)malloc(sizeof(STable)); STable *table = (STable *)calloc(1, sizeof(STable));
if (table == NULL) { if (table == NULL) {
if (newSuper) tsdbFreeTable(super); if (newSuper) tsdbFreeTable(super);
return -1; return -1;
...@@ -239,7 +235,6 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -239,7 +235,6 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
table->superUid = -1; table->superUid = -1;
table->schema = tdDupSchema(pCfg->schema); table->schema = tdDupSchema(pCfg->schema);
} }
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
// Register to meta // Register to meta
if (newSuper) tsdbAddTableToMeta(pMeta, super, true); if (newSuper) tsdbAddTableToMeta(pMeta, super, true);
...@@ -299,10 +294,10 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { ...@@ -299,10 +294,10 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
return 0; return 0;
} }
int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) { // int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
tSkipListPut(pTable->content.pData, pNode); // tSkipListPut(pTable->mem->pData, pNode);
return 0; // return 0;
} // }
static int tsdbFreeTable(STable *pTable) { static int tsdbFreeTable(STable *pTable) {
// TODO: finish this function // TODO: finish this function
...@@ -314,10 +309,8 @@ static int tsdbFreeTable(STable *pTable) { ...@@ -314,10 +309,8 @@ static int tsdbFreeTable(STable *pTable) {
// Free content // Free content
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) { if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
tSkipListDestroy(pTable->content.pIndex); tSkipListDestroy(pTable->pIndex);
} else { }
tSkipListDestroy(pTable->content.pData);
}
free(pTable); free(pTable);
return 0; return 0;
...@@ -404,7 +397,7 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) { ...@@ -404,7 +397,7 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) {
return size; return size;
} }
static char *getTupleKey(const void * data) { char *getTupleKey(const void * data) {
SDataRow row = (SDataRow)data; SDataRow row = (SDataRow)data;
return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
......
...@@ -40,7 +40,6 @@ TEST(TsdbTest, tableEncodeDecode) { ...@@ -40,7 +40,6 @@ TEST(TsdbTest, tableEncodeDecode) {
ASSERT_EQ(pTable->superUid, tTable->superUid); ASSERT_EQ(pTable->superUid, tTable->superUid);
ASSERT_EQ(pTable->sversion, tTable->sversion); ASSERT_EQ(pTable->sversion, tTable->sversion);
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
ASSERT_EQ(tTable->content.pData, nullptr);
} }
TEST(TsdbTest, createRepo) { TEST(TsdbTest, createRepo) {
...@@ -72,7 +71,7 @@ TEST(TsdbTest, createRepo) { ...@@ -72,7 +71,7 @@ TEST(TsdbTest, createRepo) {
tsdbCreateTable(pRepo, &tCfg); tsdbCreateTable(pRepo, &tCfg);
// // 3. Loop to write some simple data // // 3. Loop to write some simple data
int nRows = 100; int nRows = 1000;
int rowsPerSubmit = 10; int rowsPerSubmit = 10;
int64_t start_time = 1584081000000; int64_t start_time = 1584081000000;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册