提交 b7b4148b 编写于 作者: H Hongze Cheng

TD-353

上级 adc6a190
...@@ -48,6 +48,7 @@ typedef struct STable { ...@@ -48,6 +48,7 @@ typedef struct STable {
ETableType type; ETableType type;
tstr* name; // NOTE: there a flexible string here tstr* name; // NOTE: there a flexible string here
STableId tableId; STableId tableId;
uint64_t suid;
STable* pSuper; // super table pointer STable* pSuper; // super table pointer
uint8_t numOfSchemas; uint8_t numOfSchemas;
STSchema schema[TSDB_MAX_TABLE_SCHEMAS]; STSchema schema[TSDB_MAX_TABLE_SCHEMAS];
...@@ -59,6 +60,7 @@ typedef struct STable { ...@@ -59,6 +60,7 @@ typedef struct STable {
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
char* sql; char* sql;
void* cqhandle; void* cqhandle;
T_REF_DECLARE();
} STable; } STable;
typedef struct { typedef struct {
...@@ -69,8 +71,6 @@ typedef struct { ...@@ -69,8 +71,6 @@ typedef struct {
SList* superList; SList* superList;
SHashObj* uidMap; SHashObj* uidMap;
SKVStore* pStore; SKVStore* pStore;
int maxRowBytes;
int maxCols;
} STsdbMeta; } STsdbMeta;
// ------------------ tsdbBuffer.c // ------------------ tsdbBuffer.c
...@@ -107,6 +107,8 @@ typedef struct { ...@@ -107,6 +107,8 @@ typedef struct {
STableData** tData; STableData** tData;
SList* actList; SList* actList;
SList* bufBlockList; SList* bufBlockList;
int maxCols;
int maxRowBytes;
} SMemTable; } SMemTable;
// ------------------ tsdbFile.c // ------------------ tsdbFile.c
...@@ -278,7 +280,7 @@ typedef struct { ...@@ -278,7 +280,7 @@ typedef struct {
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data #define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
#define TALBE_UID(t) (t)->tableId.uid #define TALBE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid #define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->superUid #define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey #define TABLE_LASTKEY(t) (t)->lastKey
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
......
...@@ -20,69 +20,64 @@ ...@@ -20,69 +20,64 @@
#include "tsdbMain.h" #include "tsdbMain.h"
#include "tskiplist.h" #include "tskiplist.h"
#define TSDB_SUPER_TABLE_SL_LEVEL 5
// ------------------ OUTER FUNCTIONS ------------------ // ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
STable * super = NULL;
if (tsdbCheckTableCfg(pCfg) < 0) return -1; STable * table = NULL;
int newSuper = 0;
STable *pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid); STable *pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid);
if (pTable != NULL) { if (pTable != NULL) {
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name), tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
pTable->tableId.tid, pTable->tableId.uid); TABLE_TID(pTable), TALBE_UID(pTable));
return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; return TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
} }
STable *super = NULL;
int newSuper = 0;
if (pCfg->type == TSDB_CHILD_TABLE) { if (pCfg->type == TSDB_CHILD_TABLE) {
super = tsdbGetTableByUid(pMeta, pCfg->superUid); super = tsdbGetTableByUid(pMeta, pCfg->superUid);
if (super == NULL) { // super table not exists, try to create it if (super == NULL) { // super table not exists, try to create it
newSuper = 1; newSuper = 1;
super = tsdbNewTable(pCfg, true); super = tsdbNewTable(pCfg, true);
if (super == NULL) return -1; if (super == NULL) goto _err;
} else { } else {
// TODO
if (super->type != TSDB_SUPER_TABLE) return -1; if (super->type != TSDB_SUPER_TABLE) return -1;
if (super->tableId.uid != pCfg->superUid) return -1; if (super->tableId.uid != pCfg->superUid) return -1;
tsdbUpdateTable(pMeta, super, pCfg); tsdbUpdateTable(pMeta, super, pCfg);
} }
} }
STable *table = tsdbNewTable(pCfg, false); table = tsdbNewTable(pCfg, false);
if (table == NULL) { if (table == NULL) goto _err;
if (newSuper) {
tsdbFreeTable(super);
return -1;
}
}
table->lastKey = TSKEY_INITIAL_VAL;
// Register to meta // Register to meta
if (newSuper) { if (newSuper) {
tsdbAddTableToMeta(pMeta, super, true); if (tsdbAddTableToMeta(pRepo, super, true) < 0) goto _err;
tsdbTrace("vgId:%d, super table %s is created! uid:%" PRId64, pRepo->config.tsdbId, varDataVal(super->name),
super->tableId.uid);
} }
tsdbAddTableToMeta(pMeta, table, true); if (tsdbAddTableToMeta(pRepo, table, true) < 0) goto _err;
tsdbTrace("vgId:%d, table %s is created! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(table->name),
table->tableId.tid, table->tableId.uid);
// Write to meta file // // Write to meta file
int bufLen = 0; // int bufLen = 0;
char *buf = malloc(1024*1024); // char *buf = malloc(1024 * 1024);
if (newSuper) { // if (newSuper) {
tsdbEncodeTable(super, buf, &bufLen); // tsdbEncodeTable(super, buf, &bufLen);
tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen); // tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen);
} // }
tsdbEncodeTable(table, buf, &bufLen); // tsdbEncodeTable(table, buf, &bufLen);
tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen); // tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen);
tfree(buf); // tfree(buf);
return 0; return 0;
_err:
tsdbFreeTable(super);
tsdbFreeTable(table);
return -1;
} }
int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
...@@ -412,6 +407,40 @@ char *getTSTupleKey(const void * data) { ...@@ -412,6 +407,40 @@ char *getTSTupleKey(const void * data) {
return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE); return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE);
} }
int tsdbWLockRepoMeta(STsdbRepo *pRepo) {
int code = pthread_rwlock_wrlock(&(pRepo->tsdbMeta->rwLock));
if (code != 0) {
tsdbError("vgId:%d failed to write lock TSDB meta since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbRLockRepoMeta(STsdbRepo *pRepo) {
int code = pthread_rwlock_rdlock(&(pRepo->tsdbMeta->rwLock));
if (code != 0) {
tsdbError("vgId:%d failed to read lock TSDB meta since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
int code = pthread_rwlock_unlock(&(pRepo->tsdbMeta->rwLock));
if (code != 0) {
tsdbError("vgId:%d failed to unlock TSDB meta since %s", REPO_ID(pRepo), strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
// ------------------ LOCAL FUNCTIONS ------------------ // ------------------ LOCAL FUNCTIONS ------------------
static void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { static void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
...@@ -568,19 +597,8 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { ...@@ -568,19 +597,8 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
goto _err; goto _err;
} }
pTable->type = pCfg->type;
pTable->numOfSchemas = 0;
if (isSuper) { if (isSuper) {
pTable->type = TSDB_SUPER_TABLE; pTable->type = TSDB_SUPER_TABLE;
pTable->tableId.uid = pCfg->superUid;
pTable->tableId.tid = -1;
pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID;
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
pTable->numOfSchemas = 1;
pTable->schema[0] = tdDupSchema(pCfg->schema);
pTable->tagSchema = tdDupSchema(pCfg->tagSchema);
tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN - 1); tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN - 1);
pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1); pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1);
if (pTable->name == NULL) { if (pTable->name == NULL) {
...@@ -588,20 +606,31 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { ...@@ -588,20 +606,31 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
goto _err; goto _err;
} }
STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->sname, tsize); STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->sname, tsize);
TALBE_UID(pTable) = pCfg->superUid;
TABLE_TID(pTable) = -1;
TABLE_SUID(pTable) = -1;
pTable->pSuper = NULL;
pTable->numOfSchemas = 1;
pTable->schema[0] = tdDupSchema(pCfg->schema);
if (pTable->schema[0] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pTable->tagSchema = tdDupSchema(pCfg->tagSchema);
if (pTable->tagSchema == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pTable->tagVal = NULL;
STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0); STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 0, pTable->pIndex =
getTagIndexKey); // Allow duplicate key, no lock tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 1, getTagIndexKey);
if (pTable->pIndex == NULL) { if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
} }
} else { } else {
pTable->type = pCfg->type; pTable->type = pCfg->type;
pTable->tableId.uid = pCfg->tableId.uid;
pTable->tableId.tid = pCfg->tableId.tid;
pTable->lastKey = TSKEY_INITIAL_VAL;
tsize = strnlen(pCfg->name, TSDB_TABLE_NAME_LEN - 1); tsize = strnlen(pCfg->name, TSDB_TABLE_NAME_LEN - 1);
pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1); pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1);
if (pTable->name == NULL) { if (pTable->name == NULL) {
...@@ -609,25 +638,39 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { ...@@ -609,25 +638,39 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
goto _err; goto _err;
} }
STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->name, tsize); STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->name, tsize);
TALBE_UID(pTable) = pCfg->tableId.uid;
TABLE_TID(pTable) = pCfg->tableId.tid;
if (pCfg->type == TSDB_CHILD_TABLE) { if (pCfg->type == TSDB_CHILD_TABLE) {
pTable->superUid = pCfg->superUid; TABLE_SUID(pTable) = pCfg->superUid;
pTable->tagVal = tdKVRowDup(pCfg->tagValues); pTable->tagVal = tdKVRowDup(pCfg->tagValues);
if (pTable->tagVal == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
} else { } else {
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS); TABLE_SUID(pTable) = -1;
pTable->numOfSchemas = 1; pTable->numOfSchemas = 1;
pTable->schema[0] = tdDupSchema(pCfg->schema); pTable->schema[0] = tdDupSchema(pCfg->schema);
if (pTable->schema[0] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (pCfg->type == TSDB_NORMAL_TABLE) { if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) {
pTable->superUid = -1;
} else {
ASSERT(pCfg->type == TSDB_STREAM_TABLE);
pTable->superUid = -1;
pTable->sql = strdup(pCfg->sql); pTable->sql = strdup(pCfg->sql);
if (pTable->sql == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
} }
} }
pTable->lastKey = TSKEY_INITIAL_VAL;
} }
T_REF_INC(pTable);
return pTable; return pTable;
_err: _err:
...@@ -635,34 +678,24 @@ _err: ...@@ -635,34 +678,24 @@ _err:
return NULL; return NULL;
} }
static int tsdbFreeTable(STable *pTable) { static void tsdbFreeTable(STable *pTable) {
if (pTable == NULL) return 0; if (pTable) {
tfree(TABLE_NAME(pTable));
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) {
tdFreeSchema(pTable->schema[i]);
}
if (pTable->type == TSDB_CHILD_TABLE) { if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
kvRowFree(pTable->tagVal); tdFreeSchema(pTable->tagSchema);
} else { }
if (pTable->schema) {
for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]);
free(pTable->schema);
} }
}
if (pTable->type == TSDB_STREAM_TABLE) { kvRowFree(pTable->tagVal);
tfree(pTable->sql);
}
// Free content
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
tdFreeSchema(pTable->tagSchema);
tSkipListDestroy(pTable->pIndex); tSkipListDestroy(pTable->pIndex);
tfree(pTable->sql);
} }
tsdbFreeMemTable(pTable->mem);
tsdbFreeMemTable(pTable->imem);
tfree(pTable->name);
free(pTable);
return 0;
} }
static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) { static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) {
...@@ -677,48 +710,55 @@ static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) { ...@@ -677,48 +710,55 @@ static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; STsdbMeta *pMeta = pRepo->tsdbMeta;
if (pTable->type == TSDB_SUPER_TABLE) {
// add super table to the linked list if (addIdx && tsdbWLockRepoMeta(pRepo) < 0) {
if (pMeta->superList == NULL) { tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
pMeta->superList = pTable; tstrerror(terrno));
pTable->next = NULL; return -1;
pTable->prev = NULL; }
} else {
pTable->next = pMeta->superList; if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
pTable->prev = NULL; if (tdListAppend(pMeta->superList, (void *)(&pTable)) < 0) {
pTable->next->prev = pTable; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
pMeta->superList = pTable; tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
goto _err;
} }
} else { } else {
// add non-super table to the array pMeta->tables[TABLE_TID(pTable)] = pTable;
pMeta->tables[pTable->tableId.tid] = pTable; if (TABLE_TID(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
if (pTable->type == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index if (tsdbAddTableIntoIndex(pMeta, pTable) < 0) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbTrace("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
} TABLE_CHAR_NAME(pTable), tstrerror(terrno));
if (pTable->type == TSDB_STREAM_TABLE && addIdx) { goto _err;
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); }
} }
pMeta->nTables++; pMeta->nTables++;
}
// Update the pMeta->maxCols and pMeta->maxRowBytes return 0;
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE) {
if (schemaNCols(pTable->schema[pTable->numOfSchemas - 1]) > pMeta->maxCols)
pMeta->maxCols = schemaNCols(pTable->schema[pTable->numOfSchemas - 1]);
int bytes = dataRowMaxBytesFromSchema(pTable->schema[pTable->numOfSchemas - 1]);
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
} }
if (taosHashPut(pMeta->map, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable), sizeof(pTable)) < 0) { if (taosHashPut(pMeta->map, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable),
sizeof(pTable)) < 0) {
return -1; return -1;
} }
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TALBE_UID(pTable));
return 0; return 0;
_err:
tsdbRemoveTableFromIndex()
if (addIdx) tsdbUnlockRepoMeta(pRepo);
return -1;
} }
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx) { static int tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx) {
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex); SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册