From b823afbd6e1a6408132d5c5111e2bd59156e2fe5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 3 Jul 2020 18:28:51 +0800 Subject: [PATCH] update table schema change code --- src/client/src/tscUtil.c | 4 +- src/inc/taoserror.h | 1 + src/inc/taosmsg.h | 3 +- src/tsdb/inc/tsdbMain.h | 43 +++++++ src/tsdb/src/tsdbMain.c | 216 +++++++++++++++++++++++------------- src/tsdb/src/tsdbMemTable.c | 11 +- src/tsdb/src/tsdbMeta.c | 61 +++++----- src/tsdb/src/tsdbRWHelper.c | 2 +- 8 files changed, 226 insertions(+), 115 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9b6eff7123..74d41304a4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -579,9 +579,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta); for(int32_t j = 0; j < numOfCols; ++j) { STColumn* pCol = (STColumn*) pDataBlock; - pCol->colId = pSchema[j].colId; + pCol->colId = htons(pSchema[j].colId); pCol->type = pSchema[j].type; - pCol->bytes = pSchema[j].bytes; + pCol->bytes = htons(pSchema[j].bytes); pCol->offset = 0; pDataBlock += sizeof(STColumn); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 4a7d86c434..6139d4bf49 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -200,6 +200,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb inval TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 6155f08e76..8b856ccdd9 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -203,8 +203,7 @@ typedef struct SSubmitBlk { typedef struct SSubmitMsg { SMsgHead header; int32_t length; - int32_t compressed : 2; - int32_t numOfBlocks : 30; + int32_t numOfBlocks; SSubmitBlk blocks[]; } SSubmitMsg; diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 3ae88c3141..f431830d94 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -308,6 +308,49 @@ int tsdbRLockRepoMeta(STsdbRepo* pRepo); int tsdbUnlockRepoMeta(STsdbRepo* pRepo); void tsdbRefTable(STable* pTable); void tsdbUnRefTable(STable* pTable); +void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); + +static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { + if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { + return -1; + } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { + return 1; + } else { + return 0; + } +} + +static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) { + ASSERT(TABLE_TYPE(pTable) != TSDB_SUPER_TABLE); + STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + STSchema* pSchema = NULL; + STSchema* pTSchema = NULL; + + if (lock) taosRLockLatch(&(pDTable->latch)); + if (version < 0) { // get the latest version of schema + pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; + } else { // get the schema with version + void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*), + tsdbCompareSchemaVersion, TD_EQ); + if (ptr == NULL) { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + goto _exit; + } + pTSchema = *(STSchema**)ptr; + } + + ASSERT(pTSchema != NULL); + + if (copy) { + if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + } else { + pSchema = pTSchema; + } + +_exit: + if (lock) taosRUnLockLatch(&(pDTable->latch)); + return pSchema; +} // ------------------ tsdbBuffer.c STsdbBufPool* tsdbNewBufPool(); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 4b9e977a1b..7f8144db13 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -41,9 +41,9 @@ typedef struct { } SSubmitBlkIter; typedef struct { - int32_t totalLen; - int32_t len; - SSubmitBlk *pBlock; + int32_t totalLen; + int32_t len; + void * pMsg; } SSubmitMsgIter; static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); @@ -56,7 +56,7 @@ static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); static void tsdbFreeRepo(STsdbRepo *pRepo); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows); -static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter); +static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbRestoreInfo(STsdbRepo *pRepo); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); @@ -68,6 +68,7 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup); static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); +static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { @@ -164,6 +165,15 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * STsdbRepo * pRepo = (STsdbRepo *)repo; SSubmitMsgIter msgIter = {0}; + if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) { + if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + return 0; + } else { + tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + } + if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) { tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; @@ -173,12 +183,14 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * int32_t affectedrows = 0; TSKEY now = taosGetTimestamp(pRepo->config.precision); - - while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { + while (true) { + tsdbGetSubmitMsgNext(&msgIter, &pBlock); + if (pBlock == NULL) break; if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) { return -1; } } + if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); return 0; } @@ -263,7 +275,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, - tsdbGetTableSchema(pTable)); + tsdbGetTableSchemaImpl(pTable, false, false, -1)); } } } @@ -694,17 +706,12 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { return -1; } - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - pMsg->compressed = htonl(pMsg->compressed); - pIter->totalLen = pMsg->length; - pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE; + pIter->len = 0; + pIter->pMsg = pMsg; if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; - } else { - pIter->pBlock = pMsg->blocks; } return 0; @@ -714,26 +721,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY STsdbMeta *pMeta = pRepo->tsdbMeta; int64_t points = 0; - STable *pTable = tsdbGetTableByUid(pMeta, pBlock->uid); - if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) { - tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; - } - - // Check schema version and update schema if needed - if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { - tsdbError("vgId:%d failed to insert data to table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - tstrerror(terrno)); - return -1; - } + STable *pTable = pMeta->tables[pBlock->tid]; + ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); SSubmitBlkIter blkIter = {0}; SDataRow row = NULL; @@ -764,27 +753,23 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY return 0; } -static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { - SSubmitBlk *pBlock = pIter->pBlock; - if (pBlock == NULL) return NULL; - - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - pBlock->numOfRows = htons(pBlock->numOfRows); - pBlock->uid = htobe64(pBlock->uid); - pBlock->tid = htonl(pBlock->tid); - - pBlock->sversion = htonl(pBlock->sversion); - pBlock->padding = htonl(pBlock->padding); - - pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen; - if (pIter->len >= pIter->totalLen) { - pIter->pBlock = NULL; +static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { + if (pIter->len == 0) { + pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE; } else { - pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk)); + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); } - return pBlock; + if (pIter->len > pIter->totalLen) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + *pPBlock = NULL; + return -1; + } + + *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + + return 0; } static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { @@ -969,42 +954,64 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { ASSERT(pTable != NULL); - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); int sversion = schemaVersion(pSchema); - if (pBlock->sversion == sversion) return 0; - if (pBlock->sversion > sversion) { // need to config - tsdbDebug("vgId:%d table %s tid %d has version %d smaller than client version %d, try to config", REPO_ID(pRepo), - TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), sversion, pBlock->sversion); - if (pRepo->appH.configFunc) { - void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable)); - if (msg == NULL) { - tsdbError("vgId:%d failed to config table %s tid %d since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - TABLE_TID(pTable), tstrerror(terrno)); + if (pBlock->sversion == sversion) { + return 0; + } else { + if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return -1; + } + } + + if (pBlock->sversion > sversion) { // may need to update table schema + if (pBlock->schemaLen > 0) { + tsdbDebug( + "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); + ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0); + int numOfCols = pBlock->schemaLen / sizeof(STColumn); + STColumn *pTCol = (STColumn *)pBlock->data; + + STSchemaBuilder schemaBuilder = {0}; + if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); return -1; } - STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); - if (pTableCfg == NULL) { - rpcFreeCont(msg); - return -1; + for (int i = 0; i < numOfCols; i++) { + if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + tdDestroyTSchemaBuilder(&schemaBuilder); + return -1; + } } - if (tsdbUpdateTable(pRepo, (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable, pTableCfg) < 0) { - tsdbError("vgId:%d failed to update table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - tstrerror(terrno)); - tsdbClearTableCfg(pTableCfg); - rpcFreeCont(msg); + STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder); + if (pNSchema == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tdDestroyTSchemaBuilder(&schemaBuilder); return -1; } - tsdbClearTableCfg(pTableCfg); - rpcFreeCont(msg); + + tdDestroyTSchemaBuilder(&schemaBuilder); + tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true); } else { - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + tsdbDebug( + "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); + terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE; return -1; } } else { - if (tsdbGetTableSchemaByVersion(pTable, pBlock->sversion) == NULL) { + ASSERT(pBlock->sversion >= 0); + if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) { tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo), pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable)); } @@ -1013,7 +1020,64 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT } return 0; - } +} + +static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { + ASSERT(pMsg != NULL); + STsdbMeta * pMeta = pRepo->tsdbMeta; + SSubmitMsgIter msgIter = {0}; + SSubmitBlk * pBlock = NULL; + + terrno = TSDB_CODE_SUCCESS; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + pBlock->sversion = htonl(pBlock->sversion); + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + pBlock->numOfRows = htons(pBlock->numOfRows); + + if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + STable *pTable = pMeta->tables[pBlock->tid]; + if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); + terrno = TSDB_CODE_TDB_INVALID_ACTION; + return -1; + } + + // Check schema version and update schema if needed + if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { + if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + continue; + } else { + return -1; + } + } + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { // TODO diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index d3b9081a36..af86de5aa8 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -538,10 +538,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe SCommitIter *pIter = iters + tid; if (pIter->pTable == NULL) continue; + taosRLockLatch(&(pIter->pTable->latch)); + tsdbSetHelperTable(pHelper, pIter->pTable, pRepo); if (pIter->pIter != NULL) { - tdInitDataCols(pDataCols, tsdbGetTableSchema(pIter->pTable)); + tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)); int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; int nLoop = 0; @@ -557,6 +559,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); ASSERT(rowsWritten != 0); if (rowsWritten < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), tstrerror(terrno)); @@ -571,6 +574,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ASSERT(pDataCols->numOfRows == 0); } + taosRUnLockLatch(&(pIter->pTable->latch)); + // Move the last block to the new .l file if neccessary if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -680,10 +685,10 @@ static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIter if (dataRowKey(row) > maxKey) break; if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row)); if (pSchema == NULL) { // TODO: deal with the error here - ASSERT(false); + ASSERT(0); } } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index cbbf51d862..a473f86fa0 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -449,17 +449,7 @@ int tsdbCloseMeta(STsdbRepo *pRepo) { return 0; } -STSchema *tsdbGetTableSchema(STable *pTable) { - if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) { - return pTable->schema[pTable->numOfSchemas - 1]; - } else if (pTable->type == TSDB_CHILD_TABLE) { - STable *pSuper = pTable->pSuper; - if (pSuper == NULL) return NULL; - return pSuper->schema[pSuper->numOfSchemas - 1]; - } else { - return NULL; - } -} +STSchema *tsdbGetTableSchema(STable *pTable) { return tsdbGetTableSchemaImpl(pTable, true, true, -1); } STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { void *ptr = taosHashGet(pMeta->uidMap, (char *)(&uid), sizeof(uid)); @@ -470,14 +460,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { } STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) { - STable *pSearchTable = (pTable->type == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; - if (pSearchTable == NULL) return NULL; - - void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *), - tsdbCompareSchemaVersion, TD_EQ); - if (ptr == NULL) return NULL; - - return *(STSchema **)ptr; + return tsdbGetTableSchemaImpl(pTable, true, true, version); } STSchema *tsdbGetTableTagSchema(STable *pTable) { @@ -575,7 +558,7 @@ void tsdbRefTable(STable *pTable) { void tsdbUnRefTable(STable *pTable) { int32_t ref = T_REF_DEC(pTable); - tsdbTrace("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); + tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); if (ref == 0) { // tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable)); @@ -587,17 +570,32 @@ void tsdbUnRefTable(STable *pTable) { } } -// ------------------ LOCAL FUNCTIONS ------------------ -static int tsdbCompareSchemaVersion(const void *key1, const void *key2) { - if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { - return -1; - } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { - return 1; +void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) { + ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE); + + STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1])); + + taosWLockLatch(&(pCTable->latch)); + if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { + pCTable->schema[pCTable->numOfSchemas++] = pSchema; } else { - return 0; + ASSERT(pCTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS); + tdFreeSchema(pCTable->schema[0]); + memmove(pCTable->schema, pCTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1)); + pCTable->schema[pCTable->numOfSchemas - 1] = pSchema; + } + taosWUnLockLatch(&(pCTable->latch)); + + if (insertAct) { + int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable); + void *buf = tsdbAllocBytes(pRepo, tlen); + ASSERT(buf != NULL); + tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); } } +// ------------------ LOCAL FUNCTIONS ------------------ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { STsdbRepo *pRepo = (STsdbRepo *)pHandle; STable * pTable = NULL; @@ -809,14 +807,15 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { } if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema); if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema); } if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, tsdbGetTableSchema(pTable)); + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, + tsdbGetTableSchemaImpl(pTable, false, false, -1)); } tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), @@ -836,7 +835,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro STable * tTable = NULL; STsdbCfg * pCfg = &(pRepo->config); - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); int maxCols = schemaNCols(pSchema); int maxRowBytes = schemaTLen(pSchema); @@ -870,7 +869,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro for (int i = 0; i < pCfg->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable != NULL) { - pSchema = tsdbGetTableSchema(pTable); + pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); maxCols = MAX(maxCols, schemaNCols(pSchema)); maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema)); } diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index 934fa8e733..bf4e5395a1 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -217,7 +217,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { pHelper->tableInfo.tid = pTable->tableId.tid; pHelper->tableInfo.uid = pTable->tableId.uid; - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); pHelper->tableInfo.sversion = schemaVersion(pSchema); tdInitDataCols(pHelper->pDataCols[0], pSchema); -- GitLab