提交 865ddbc0 编写于 作者: L lichuang

[TD-4034]fix schema change

上级 c0114ae1
......@@ -38,11 +38,15 @@ typedef struct STable {
SRWLatch latch; // TODO: implementa latch functions
SDataCol *lastCols;
int32_t lastColNum;
int32_t restoreColumnNum;
int16_t lastColNum;
int16_t maxColumnNum;
int lastColSVersion;
T_REF_DECLARE()
} STable;
#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20
#define TSDB_LATEST_COLUMN_ARRAY_ADD_SIZE 5
typedef struct {
pthread_rwlock_t rwLock;
......@@ -82,6 +86,9 @@ void tsdbUnRefTable(STable* pTable);
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen);
void tsdbOrgMeta(STsdbRepo* pRepo);
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
......
......@@ -617,18 +617,27 @@ static void tsdbStopStream(STsdbRepo *pRepo) {
}
static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
if (pTable->numOfSchemas == 0) {
return 0;
}
SBlock* pBlock;
int numColumns;
int32_t blockIdx;
SDataStatis* pBlockStatis = NULL;
SDataRow row = NULL;
STSchema *pSchema = tsdbGetTableSchema(pTable);
// restore last column data with last schema
STSchema *pSchema = pTable->schema[pTable->numOfSchemas - 1];
int err = 0;
numColumns = schemaNCols(pSchema);
if (numColumns <= pTable->restoreColumnNum) {
if (numColumns <= pTable->maxColumnNum) {
return 0;
}
if (pTable->lastColSVersion != schemaVersion(pSchema)) {
if (tsdbInitColIdCacheWithSchema(pTable, pSchema) < 0) {
return -1;
}
}
row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
if (row == NULL) {
......@@ -660,7 +669,7 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
SBlockIdx *pIdx = pReadh->pBlkIdx;
blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) {
while (numColumns > pTable->maxColumnNum && blockIdx >= 0) {
bool loadStatisData = false;
pBlock = pReadh->pBlkInfo->blocks + blockIdx;
blockIdx -= 1;
......@@ -678,18 +687,8 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
loadStatisData = true;
}
for (uint32_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
for (int16_t i = 0; i < numColumns && numColumns > pTable->maxColumnNum; ++i) {
STColumn *pCol = schemaColAt(pSchema, i);
if (i >= pTable->lastColNum) {
pTable->lastCols = realloc(pTable->lastCols, i + 5);
for (int m = 0; m < 5; ++m) {
pTable->lastCols[m + pTable->lastColNum].bytes = 0;
pTable->lastCols[m + pTable->lastColNum].pData = NULL;
}
pTable->lastColNum += i + 5;
}
// ignore loaded columns
if (pTable->lastCols[i].bytes != 0) {
continue;
......@@ -710,11 +709,15 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
continue;
}
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
if (idx == -1) {
tsdbError("restoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId);
continue;
}
// save not-null column
SDataCol *pLastCol = &(pTable->lastCols[i]);
SDataCol *pLastCol = &(pTable->lastCols[idx]);
pLastCol->pData = malloc(pCol->bytes);
pLastCol->bytes = pCol->bytes;
pLastCol->offset = pCol->offset;
pLastCol->colId = pCol->colId;
memcpy(pLastCol->pData, value, pCol->bytes);
......@@ -722,11 +725,11 @@ static int restoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh)
pDataCol = pReadh->pDCols[0]->cols + 0;
pCol = schemaColAt(pSchema, 0);
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
pLastCol->ts = dataRowTKey(row);
pLastCol->ts = dataRowTKey(row);
pTable->restoreColumnNum += 1;
pTable->maxColumnNum += 1;
tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %d", REPO_ID(pRepo), pTable->name->data, pCol->colId, (int32_t)pLastCol->ts);
tsdbInfo("restoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
break;
}
}
......@@ -757,7 +760,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
pTable->restoreColumnNum = 0;
pTable->maxColumnNum = 0;
}
}
......@@ -822,7 +825,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
}
// restore NULL columns
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg)) {
if (restoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbDestroyReadH(&readh);
return -1;
......
......@@ -972,7 +972,11 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
}
STSchema* pSchema = pTable->schema[pTable->numOfSchemas - 1];
int i = pTable->numOfSchemas - 1;
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
return;
}
int16_t i = pTable->numOfSchemas - 1;
while ((pSchema == NULL || pSchema->version != dataRowVersion(row)) && i >= 0) {
i -= 1;
pSchema = pTable->schema[i];
......@@ -983,23 +987,20 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
SDataCol *pLatestCols = pTable->lastCols;
for (int j = 0; j < schemaNCols(pSchema); j++) {
for (int16_t j = 0; j < schemaNCols(pSchema); j++) {
STColumn *pTCol = schemaColAt(pSchema, j);
if (pTCol->colId >= pTable->lastColNum) {
pTable->lastCols = realloc(pTable->lastCols, pTCol->colId + 5);
for (i = 0; i < 10; ++i) {
pTable->lastCols[i + pTable->lastColNum].bytes = 0;
pTable->lastCols[i + pTable->lastColNum].pData = NULL;
}
pTable->lastColNum += pTCol->colId + 5;
// ignore not exist colId
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pTCol->colId);
if (idx == -1) {
continue;
}
SDataCol *pDataCol = &(pLatestCols[pTCol->colId]);
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (isNullN(value, pTCol->type)) {
continue;
}
SDataCol *pDataCol = &(pLatestCols[idx]);
if (pDataCol->pData == NULL) {
pDataCol->pData = malloc(pSchema->columns[j].bytes);
pDataCol->bytes = pSchema->columns[j].bytes;
......
......@@ -14,7 +14,6 @@
*/
#include "tsdbint.h"
#define TSDB_LATEST_COLUMN_ARRAY_SIZE 20
#define TSDB_SUPER_TABLE_SL_LEVEL 5
#define DEFAULT_TAG_INDEX_COLUMN 0
......@@ -45,6 +44,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
static void tsdbFreeLastColumns(STable* pTable);
// ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
......@@ -590,6 +590,116 @@ void tsdbUnRefTable(STable *pTable) {
}
}
static void tsdbFreeLastColumns(STable* pTable) {
if (pTable->lastCols == NULL) {
return;
}
for (int i = 0; i < pTable->lastColNum; ++i) {
if (pTable->lastCols[i].bytes == 0) {
continue;
}
tfree(pTable->lastCols[i].pData);
pTable->lastCols[i].bytes = 0;
pTable->lastCols[i].pData = NULL;
}
tfree(pTable->lastCols);
pTable->lastCols = NULL;
pTable->lastColNum = 0;
}
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
if (pTable->lastCols == NULL) {
return -1;
}
for (int16_t i = 0; i < pTable->lastColNum; ++i) {
if (pTable->lastCols[i].colId == colId) {
return i;
}
}
return -1;
}
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
ASSERT(pTable->lastCols == NULL);
int16_t numOfColumn = pSchema->numOfCols;
pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol));
if (pTable->lastCols == NULL) {
return -1;
}
for (int16_t i = 0; i < numOfColumn; ++i) {
STColumn *pCol = schemaColAt(pSchema, i);
SDataCol* pDataCol = &(pTable->lastCols[i]);
pDataCol->bytes = 0;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
}
pTable->lastColSVersion = schemaVersion(pSchema);
pTable->lastColNum = numOfColumn;
pTable->maxColumnNum = 0;
return 0;
}
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
if (pTable->lastColSVersion == schemaVersion(pNewSchema)) {
return 0;
}
tsdbInfo("tsdbUpdateLastColSchema:%s,%d->%d", pTable->name->data, pTable->lastColSVersion, schemaVersion(pNewSchema));
int16_t numOfCols = pNewSchema->numOfCols;
SDataCol *lastCols = (SDataCol*)malloc(numOfCols * sizeof(SDataCol));
if (lastCols == NULL) {
return -1;
}
TSDB_WLOCK_TABLE(pTable);
int16_t oldIdx = 0;
for (int16_t i = 0; i < numOfCols; ++i) {
STColumn *pCol = schemaColAt(pNewSchema, i);
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
SDataCol* pDataCol = &(lastCols[i]);
if (idx != -1) {
SDataCol* pOldDataCol = &(pTable->lastCols[idx]);
memcpy(pDataCol, pOldDataCol, sizeof(SDataCol));
} else {
pDataCol->colId = pCol->colId;
pDataCol->bytes = 0;
pDataCol->pData = NULL;
}
// free dropped column data
while (oldIdx < idx && oldIdx < pTable->lastColNum) {
SDataCol* pOldDataCol = &(pTable->lastCols[oldIdx]);
if (pOldDataCol->bytes != 0) {
tfree(pOldDataCol->pData);
pOldDataCol->bytes = 0;
}
++oldIdx;
}
if (idx != -1 && oldIdx == idx) {
oldIdx += 1;
}
}
// free old schema last column datas
tfree(pTable->lastCols);
pTable->lastColSVersion = schemaVersion(pNewSchema);
pTable->lastCols = lastCols;
pTable->lastColNum = numOfCols;
TSDB_WUNLOCK_TABLE(pTable);
return 0;
}
void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) {
ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE);
STsdbMeta *pMeta = pRepo->tsdbMeta;
......@@ -672,14 +782,11 @@ static STable *tsdbNewTable() {
}
pTable->lastKey = TSKEY_INITIAL_VAL;
pTable->lastCols = (SDataCol*)malloc(TSDB_LATEST_COLUMN_ARRAY_SIZE * sizeof(SDataCol));
pTable->lastColNum = TSDB_LATEST_COLUMN_ARRAY_SIZE;
for (int i = 0; i < pTable->lastColNum; ++i) {
pTable->lastCols[i].bytes = 0;
pTable->lastCols[i].pData = NULL;
}
pTable->restoreColumnNum = 0;
pTable->lastCols = NULL;
pTable->maxColumnNum = 0;
pTable->lastColNum = 0;
pTable->lastColSVersion = -1;
return pTable;
}
......@@ -796,14 +903,7 @@ static void tsdbFreeTable(STable *pTable) {
taosTZfree(pTable->lastRow);
tfree(pTable->sql);
for (int i = 0; i < pTable->lastColNum; ++i) {
if (pTable->lastCols[i].pData == NULL) {
continue;
}
free(pTable->lastCols[i].pData);
}
tfree(pTable->lastCols);
tsdbFreeLastColumns(pTable);
free(pTable);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册