From 865ddbc0268239ea62d0344449e2abdf53557b86 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 19 May 2021 20:40:04 +0800 Subject: [PATCH] [TD-4034]fix schema change --- src/tsdb/inc/tsdbMeta.h | 11 ++- src/tsdb/src/tsdbMain.c | 45 ++++++------ src/tsdb/src/tsdbMemTable.c | 23 ++++--- src/tsdb/src/tsdbMeta.c | 132 +++++++++++++++++++++++++++++++----- 4 files changed, 161 insertions(+), 50 deletions(-) diff --git a/src/tsdb/inc/tsdbMeta.h b/src/tsdb/inc/tsdbMeta.h index a8c7a6c358..43c85d89cb 100644 --- a/src/tsdb/inc/tsdbMeta.h +++ b/src/tsdb/inc/tsdbMeta.h @@ -38,11 +38,15 @@ typedef struct STable { SRWLatch latch; // TODO: implementa latch functions SDataCol *lastCols; - int32_t lastColNum; - int32_t restoreColumnNum; + int16_t lastColNum; + int16_t maxColumnNum; + int lastColSVersion; T_REF_DECLARE() } STable; +#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 +#define TSDB_LATEST_COLUMN_ARRAY_ADD_SIZE 5 + typedef struct { pthread_rwlock_t rwLock; @@ -82,6 +86,9 @@ void tsdbUnRefTable(STable* pTable); void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen); void tsdbOrgMeta(STsdbRepo* pRepo); +int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); +int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); +int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index a7e1efb5ed..3241d617b6 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -617,18 +617,27 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { + if (pTable->numOfSchemas == 0) { + return 0; + } SBlock* pBlock; int numColumns; int32_t blockIdx; SDataStatis* pBlockStatis = NULL; SDataRow row = NULL; - STSchema *pSchema = tsdbGetTableSchema(pTable); + // restore last column data with last schema + STSchema *pSchema = pTable->schema[pTable->numOfSchemas - 1]; int err = 0; numColumns = schemaNCols(pSchema); - if (numColumns <= pTable->restoreColumnNum) { + if (numColumns <= pTable->maxColumnNum) { return 0; } + if (pTable->lastColSVersion != schemaVersion(pSchema)) { + if (tsdbInitColIdCacheWithSchema(pTable, pSchema) < 0) { + return -1; + } + } row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); if (row == NULL) { @@ -660,7 +669,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) SBlockIdx *pIdx = pReadh->pBlkIdx; blockIdx = (int32_t)(pIdx->numOfBlocks - 1); - while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) { + while (numColumns > pTable->maxColumnNum && blockIdx >= 0) { bool loadStatisData = false; pBlock = pReadh->pBlkInfo->blocks + blockIdx; blockIdx -= 1; @@ -678,18 +687,8 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) loadStatisData = true; } - for (uint32_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { + for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) { STColumn *pCol = schemaColAt(pSchema, i); - - if (i >= pTable->lastColNum) { - pTable->lastCols = realloc(pTable->lastCols, i + 5); - for (int m = 0; m < 5; ++m) { - pTable->lastCols[m + pTable->lastColNum].bytes = 0; - pTable->lastCols[m + pTable->lastColNum].pData = NULL; - } - pTable->lastColNum += i + 5; - } - // ignore loaded columns if (pTable->lastCols[i].bytes != 0) { continue; @@ -710,11 +709,15 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) continue; } + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); + if (idx == -1) { + tsdbError("restoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId); + continue; + } // save not-null column - SDataCol *pLastCol = &(pTable->lastCols[i]); + SDataCol *pLastCol = &(pTable->lastCols[idx]); pLastCol->pData = malloc(pCol->bytes); pLastCol->bytes = pCol->bytes; - pLastCol->offset = pCol->offset; pLastCol->colId = pCol->colId; memcpy(pLastCol->pData, value, pCol->bytes); @@ -722,11 +725,11 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); - pLastCol->ts = dataRowTKey(row); + pLastCol->ts = dataRowTKey(row); - pTable->restoreColumnNum += 1; + pTable->maxColumnNum += 1; - tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %d", REPO_ID(pRepo), pTable->name->data, pCol->colId, (int32_t)pLastCol->ts); + tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); break; } } @@ -757,7 +760,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - pTable->restoreColumnNum = 0; + pTable->maxColumnNum = 0; } } @@ -822,7 +825,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } // restore NULL columns - if (CACHE_LAST_NULL_COLUMN(pCfg)) { + if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) { if (restoreLastColumns(pRepo, pTable, &readh) != 0) { tsdbDestroyReadH(&readh); return -1; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 70e27a5700..900c034a22 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -972,7 +972,11 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r } STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1]; - int i = pTable->numOfSchemas - 1; + if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { + return; + } + + int16_t i = pTable->numOfSchemas - 1; while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) { i -= 1; pSchema = pTable->schema[i]; @@ -983,23 +987,20 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r SDataCol *pLatestCols = pTable->lastCols; - for (int j = 0; j < schemaNCols(pSchema); j++) { + for (int16_t j = 0; j < schemaNCols(pSchema); j++) { STColumn *pTCol = schemaColAt(pSchema, j); - - if (pTCol->colId >= pTable->lastColNum) { - pTable->lastCols = realloc(pTable->lastCols, pTCol->colId + 5); - for (i = 0; i < 10; ++i) { - pTable->lastCols[i + pTable->lastColNum].bytes = 0; - pTable->lastCols[i + pTable->lastColNum].pData = NULL; - } - pTable->lastColNum += pTCol->colId + 5; + // ignore not exist colId + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pTCol->colId); + if (idx == -1) { + continue; } - SDataCol *pDataCol = &(pLatestCols[pTCol->colId]); void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); if (isNullN(value, pTCol->type)) { continue; } + + SDataCol *pDataCol = &(pLatestCols[idx]); if (pDataCol->pData == NULL) { pDataCol->pData = malloc(pSchema->columns[j].bytes); pDataCol->bytes = pSchema->columns[j].bytes; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 5a108a5d06..d785d259fa 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -14,7 +14,6 @@ */ #include "tsdbint.h" -#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20 #define TSDB_SUPER_TABLE_SL_LEVEL 5 #define DEFAULT_TAG_INDEX_COLUMN 0 @@ -45,6 +44,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid); static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema); +static void tsdbFreeLastColumns(STable* pTable); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { @@ -590,6 +590,116 @@ void tsdbUnRefTable(STable *pTable) { } } +static void tsdbFreeLastColumns(STable* pTable) { + if (pTable->lastCols == NULL) { + return; + } + + for (int i = 0; i < pTable->lastColNum; ++i) { + if (pTable->lastCols[i].bytes == 0) { + continue; + } + tfree(pTable->lastCols[i].pData); + pTable->lastCols[i].bytes = 0; + pTable->lastCols[i].pData = NULL; + } + tfree(pTable->lastCols); + pTable->lastCols = NULL; + pTable->lastColNum = 0; +} + +int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { + if (pTable->lastCols == NULL) { + return -1; + } + for (int16_t i = 0; i < pTable->lastColNum; ++i) { + if (pTable->lastCols[i].colId == colId) { + return i; + } + } + + return -1; +} + +int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { + ASSERT(pTable->lastCols == NULL); + + int16_t numOfColumn = pSchema->numOfCols; + + pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol)); + if (pTable->lastCols == NULL) { + return -1; + } + + for (int16_t i = 0; i < numOfColumn; ++i) { + STColumn *pCol = schemaColAt(pSchema, i); + SDataCol* pDataCol = &(pTable->lastCols[i]); + pDataCol->bytes = 0; + pDataCol->pData = NULL; + pDataCol->colId = pCol->colId; + } + + pTable->lastColSVersion = schemaVersion(pSchema); + pTable->lastColNum = numOfColumn; + pTable->maxColumnNum = 0; + return 0; +} + +int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) { + if (pTable->lastColSVersion == schemaVersion(pNewSchema)) { + return 0; + } + + tsdbInfo("tsdbUpdateLastColSchema:%s,%d->%d", pTable->name->data, pTable->lastColSVersion, schemaVersion(pNewSchema)); + + int16_t numOfCols = pNewSchema->numOfCols; + SDataCol *lastCols = (SDataCol*)malloc(numOfCols * sizeof(SDataCol)); + if (lastCols == NULL) { + return -1; + } + + TSDB_WLOCK_TABLE(pTable); + + int16_t oldIdx = 0; + for (int16_t i = 0; i < numOfCols; ++i) { + STColumn *pCol = schemaColAt(pNewSchema, i); + int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); + + SDataCol* pDataCol = &(lastCols[i]); + if (idx != -1) { + SDataCol* pOldDataCol = &(pTable->lastCols[idx]); + memcpy(pDataCol, pOldDataCol, sizeof(SDataCol)); + } else { + pDataCol->colId = pCol->colId; + pDataCol->bytes = 0; + pDataCol->pData = NULL; + } + + // free dropped column data + while (oldIdx < idx && oldIdx < pTable->lastColNum) { + SDataCol* pOldDataCol = &(pTable->lastCols[oldIdx]); + if (pOldDataCol->bytes != 0) { + tfree(pOldDataCol->pData); + pOldDataCol->bytes = 0; + } + ++oldIdx; + } + if (idx != -1 && oldIdx == idx) { + oldIdx += 1; + } + } + + // free old schema last column datas + tfree(pTable->lastCols); + + pTable->lastColSVersion = schemaVersion(pNewSchema); + pTable->lastCols = lastCols; + pTable->lastColNum = numOfCols; + + TSDB_WUNLOCK_TABLE(pTable); + return 0; +} + void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) { ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE); STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -672,14 +782,11 @@ static STable *tsdbNewTable() { } pTable->lastKey = TSKEY_INITIAL_VAL; - pTable->lastCols = (SDataCol*)malloc(TSDB_LATEST_COLUMN_ARRAY_SIZE * sizeof(SDataCol)); - pTable->lastColNum = TSDB_LATEST_COLUMN_ARRAY_SIZE; - for (int i = 0; i < pTable->lastColNum; ++i) { - pTable->lastCols[i].bytes = 0; - pTable->lastCols[i].pData = NULL; - } - pTable->restoreColumnNum = 0; + pTable->lastCols = NULL; + pTable->maxColumnNum = 0; + pTable->lastColNum = 0; + pTable->lastColSVersion = -1; return pTable; } @@ -796,14 +903,7 @@ static void tsdbFreeTable(STable *pTable) { taosTZfree(pTable->lastRow); tfree(pTable->sql); - for (int i = 0; i < pTable->lastColNum; ++i) { - if (pTable->lastCols[i].pData == NULL) { - continue; - } - free(pTable->lastCols[i].pData); - } - tfree(pTable->lastCols); - + tsdbFreeLastColumns(pTable); free(pTable); } } -- GitLab