diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 25e8b9ce40af6785e6ecbdee32fbf94d57324d23..8d66c6325a4248fa7d339fd408ef1fae2da30d3d 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -48,6 +48,7 @@ typedef struct STable { ETableType type; tstr* name; // NOTE: there a flexible string here STableId tableId; + uint64_t suid; STable* pSuper; // super table pointer uint8_t numOfSchemas; STSchema schema[TSDB_MAX_TABLE_SCHEMAS]; @@ -59,6 +60,7 @@ typedef struct STable { TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure char* sql; void* cqhandle; + T_REF_DECLARE(); } STable; typedef struct { @@ -69,8 +71,6 @@ typedef struct { SList* superList; SHashObj* uidMap; SKVStore* pStore; - int maxRowBytes; - int maxCols; } STsdbMeta; // ------------------ tsdbBuffer.c @@ -107,6 +107,8 @@ typedef struct { STableData** tData; SList* actList; SList* bufBlockList; + int maxCols; + int maxRowBytes; } SMemTable; // ------------------ tsdbFile.c @@ -278,7 +280,7 @@ typedef struct { #define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data #define TALBE_UID(t) (t)->tableId.uid #define TABLE_TID(t) (t)->tableId.tid -#define TABLE_SUID(t) (t)->superUid +#define TABLE_SUID(t) (t)->suid #define TABLE_LASTKEY(t) (t)->lastKey STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 57ab73844479cf46c638f76174cf369b16d530ed..47f803f9156895c751336c42aa325f9c32cde1e1 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -20,69 +20,64 @@ #include "tsdbMain.h" #include "tskiplist.h" +#define TSDB_SUPER_TABLE_SL_LEVEL 5 + // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; - - if (tsdbCheckTableCfg(pCfg) < 0) return -1; + STable * super = NULL; + STable * table = NULL; + int newSuper = 0; STable *pTable = tsdbGetTableByUid(pMeta, pCfg->tableId.uid); if (pTable != NULL) { - tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name), - pTable->tableId.tid, pTable->tableId.uid); + tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + TABLE_TID(pTable), TALBE_UID(pTable)); return TSDB_CODE_TDB_TABLE_ALREADY_EXIST; } - STable *super = NULL; - int newSuper = 0; - if (pCfg->type == TSDB_CHILD_TABLE) { super = tsdbGetTableByUid(pMeta, pCfg->superUid); if (super == NULL) { // super table not exists, try to create it newSuper = 1; super = tsdbNewTable(pCfg, true); - if (super == NULL) return -1; + if (super == NULL) goto _err; } else { + // TODO if (super->type != TSDB_SUPER_TABLE) return -1; if (super->tableId.uid != pCfg->superUid) return -1; tsdbUpdateTable(pMeta, super, pCfg); } } - STable *table = tsdbNewTable(pCfg, false); - if (table == NULL) { - if (newSuper) { - tsdbFreeTable(super); - return -1; - } - } - - table->lastKey = TSKEY_INITIAL_VAL; - + table = tsdbNewTable(pCfg, false); + if (table == NULL) goto _err; + // Register to meta if (newSuper) { - tsdbAddTableToMeta(pMeta, super, true); - tsdbTrace("vgId:%d, super table %s is created! uid:%" PRId64, pRepo->config.tsdbId, varDataVal(super->name), - super->tableId.uid); + if (tsdbAddTableToMeta(pRepo, super, true) < 0) goto _err; } - tsdbAddTableToMeta(pMeta, table, true); - tsdbTrace("vgId:%d, table %s is created! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(table->name), - table->tableId.tid, table->tableId.uid); + if (tsdbAddTableToMeta(pRepo, table, true) < 0) goto _err; - // Write to meta file - int bufLen = 0; - char *buf = malloc(1024*1024); - if (newSuper) { - tsdbEncodeTable(super, buf, &bufLen); - tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen); - } + // // Write to meta file + // int bufLen = 0; + // char *buf = malloc(1024 * 1024); + // if (newSuper) { + // tsdbEncodeTable(super, buf, &bufLen); + // tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen); + // } - tsdbEncodeTable(table, buf, &bufLen); - tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen); - tfree(buf); + // tsdbEncodeTable(table, buf, &bufLen); + // tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen); + // tfree(buf); return 0; + +_err: + tsdbFreeTable(super); + tsdbFreeTable(table); + return -1; } int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { @@ -412,6 +407,40 @@ char *getTSTupleKey(const void * data) { return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE); } +int tsdbWLockRepoMeta(STsdbRepo *pRepo) { + int code = pthread_rwlock_wrlock(&(pRepo->tsdbMeta->rwLock)); + if (code != 0) { + tsdbError("vgId:%d failed to write lock TSDB meta since %s", REPO_ID(pRepo), strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + + return 0; +} + +int tsdbRLockRepoMeta(STsdbRepo *pRepo) { + int code = pthread_rwlock_rdlock(&(pRepo->tsdbMeta->rwLock)); + if (code != 0) { + tsdbError("vgId:%d failed to read lock TSDB meta since %s", REPO_ID(pRepo), strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + + return 0; +} + +int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { + int code = pthread_rwlock_unlock(&(pRepo->tsdbMeta->rwLock)); + if (code != 0) { + tsdbError("vgId:%d failed to unlock TSDB meta since %s", REPO_ID(pRepo), strerror(code)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + + return 0; +} + + // ------------------ LOCAL FUNCTIONS ------------------ static void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { @@ -568,19 +597,8 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { goto _err; } - pTable->type = pCfg->type; - pTable->numOfSchemas = 0; - if (isSuper) { pTable->type = TSDB_SUPER_TABLE; - pTable->tableId.uid = pCfg->superUid; - pTable->tableId.tid = -1; - pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID; - pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS); - pTable->numOfSchemas = 1; - pTable->schema[0] = tdDupSchema(pCfg->schema); - pTable->tagSchema = tdDupSchema(pCfg->tagSchema); - tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN - 1); pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1); if (pTable->name == NULL) { @@ -588,20 +606,31 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { goto _err; } STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->sname, tsize); - + TALBE_UID(pTable) = pCfg->superUid; + TABLE_TID(pTable) = -1; + TABLE_SUID(pTable) = -1; + pTable->pSuper = NULL; + pTable->numOfSchemas = 1; + pTable->schema[0] = tdDupSchema(pCfg->schema); + if (pTable->schema[0] == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + pTable->tagSchema = tdDupSchema(pCfg->tagSchema); + if (pTable->tagSchema == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + pTable->tagVal = NULL; STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0); - pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 0, - getTagIndexKey); // Allow duplicate key, no lock + pTable->pIndex = + tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 1, getTagIndexKey); if (pTable->pIndex == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; } } else { pTable->type = pCfg->type; - pTable->tableId.uid = pCfg->tableId.uid; - pTable->tableId.tid = pCfg->tableId.tid; - pTable->lastKey = TSKEY_INITIAL_VAL; - tsize = strnlen(pCfg->name, TSDB_TABLE_NAME_LEN - 1); pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1); if (pTable->name == NULL) { @@ -609,25 +638,39 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { goto _err; } STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->name, tsize); + TALBE_UID(pTable) = pCfg->tableId.uid; + TABLE_TID(pTable) = pCfg->tableId.tid; if (pCfg->type == TSDB_CHILD_TABLE) { - pTable->superUid = pCfg->superUid; + TABLE_SUID(pTable) = pCfg->superUid; pTable->tagVal = tdKVRowDup(pCfg->tagValues); + if (pTable->tagVal == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } } else { - pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS); + TABLE_SUID(pTable) = -1; pTable->numOfSchemas = 1; pTable->schema[0] = tdDupSchema(pCfg->schema); + if (pTable->schema[0] == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } - if (pCfg->type == TSDB_NORMAL_TABLE) { - pTable->superUid = -1; - } else { - ASSERT(pCfg->type == TSDB_STREAM_TABLE); - pTable->superUid = -1; + if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { pTable->sql = strdup(pCfg->sql); + if (pTable->sql == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } } } + + pTable->lastKey = TSKEY_INITIAL_VAL; } + T_REF_INC(pTable); + return pTable; _err: @@ -635,34 +678,24 @@ _err: return NULL; } -static int tsdbFreeTable(STable *pTable) { - if (pTable == NULL) return 0; +static void tsdbFreeTable(STable *pTable) { + if (pTable) { + tfree(TABLE_NAME(pTable)); + if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { + for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) { + tdFreeSchema(pTable->schema[i]); + } - if (pTable->type == TSDB_CHILD_TABLE) { - kvRowFree(pTable->tagVal); - } else { - if (pTable->schema) { - for (int i = 0; i < pTable->numOfSchemas; i++) tdFreeSchema(pTable->schema[i]); - free(pTable->schema); + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + tdFreeSchema(pTable->tagSchema); + } } - } - if (pTable->type == TSDB_STREAM_TABLE) { - tfree(pTable->sql); - } + kvRowFree(pTable->tagVal); - // Free content - if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) { - tdFreeSchema(pTable->tagSchema); tSkipListDestroy(pTable->pIndex); + tfree(pTable->sql); } - - tsdbFreeMemTable(pTable->mem); - tsdbFreeMemTable(pTable->imem); - - tfree(pTable->name); - free(pTable); - return 0; } static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) { @@ -677,48 +710,55 @@ static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) { return TSDB_CODE_SUCCESS; } -static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { - STsdbRepo *pRepo = (STsdbRepo *)pMeta->pRepo; - if (pTable->type == TSDB_SUPER_TABLE) { - // add super table to the linked list - if (pMeta->superList == NULL) { - pMeta->superList = pTable; - pTable->next = NULL; - pTable->prev = NULL; - } else { - pTable->next = pMeta->superList; - pTable->prev = NULL; - pTable->next->prev = pTable; - pMeta->superList = pTable; +static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { + STsdbMeta *pMeta = pRepo->tsdbMeta; + + if (addIdx && tsdbWLockRepoMeta(pRepo) < 0) { + tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + return -1; + } + + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + if (tdListAppend(pMeta->superList, (void *)(&pTable)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + goto _err; } } else { - // add non-super table to the array - pMeta->tables[pTable->tableId.tid] = pTable; - if (pTable->type == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index - tsdbAddTableIntoIndex(pMeta, pTable); - } - if (pTable->type == TSDB_STREAM_TABLE && addIdx) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, pTable->tableId.uid, pTable->tableId.tid, pTable->sql, tsdbGetTableSchema(pMeta, pTable)); + pMeta->tables[TABLE_TID(pTable)] = pTable; + if (TABLE_TID(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index + if (tsdbAddTableIntoIndex(pMeta, pTable) < 0) { + tsdbTrace("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pTable), tstrerror(terrno)); + goto _err; + } } - + pMeta->nTables++; - } - // Update the pMeta->maxCols and pMeta->maxRowBytes - if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE) { - if (schemaNCols(pTable->schema[pTable->numOfSchemas - 1]) > pMeta->maxCols) - pMeta->maxCols = schemaNCols(pTable->schema[pTable->numOfSchemas - 1]); - int bytes = dataRowMaxBytesFromSchema(pTable->schema[pTable->numOfSchemas - 1]); - if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes; + return 0; } - if (taosHashPut(pMeta->map, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable), sizeof(pTable)) < 0) { + if (taosHashPut(pMeta->map, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable), + sizeof(pTable)) < 0) { return -1; } + + if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1; + + tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + TABLE_TID(pTable), TALBE_UID(pTable)); return 0; + +_err: + tsdbRemoveTableFromIndex() + if (addIdx) tsdbUnlockRepoMeta(pRepo); + return -1; } -static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx) { +static int tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx) { if (pTable->type == TSDB_SUPER_TABLE) { SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex); while (tSkipListIterNext(pIter)) {