提交 22a2a660 编写于 作者: H Hongze Cheng

[TD-5812]<hotfix> : fix possible coredump change schema

上级 a1c586ba
......@@ -24,8 +24,7 @@ typedef struct STable {
tstr* name; // NOTE: there a flexible string here
uint64_t suid;
struct STable* pSuper; // super table pointer
uint8_t numOfSchemas;
STSchema* schema[TSDB_MAX_TABLE_SCHEMAS];
SArray* schema;
STSchema* tagSchema;
SKVRow tagVal;
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
......@@ -107,10 +106,9 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
if (lock) TSDB_RLOCK_TABLE(pDTable);
if (_version < 0) { // get the latest version of schema
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
pTSchema = *(STSchema **)taosArrayGetLast(pTable->schema);
} else { // get the schema with version
void* ptr = taosbsearch(&_version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*),
tsdbCompareSchemaVersion, TD_EQ);
void* ptr = taosArraySearch(pTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
......
......@@ -44,6 +44,8 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema);
static int tsdbAddSchema(STable *pTable, STSchema *pSchema);
static void tsdbFreeTableSchema(STable *pTable);
// ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) {
......@@ -723,17 +725,10 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
STsdbMeta *pMeta = pRepo->tsdbMeta;
STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1]));
ASSERT(schemaVersion(pSchema) > schemaVersion(*(STSchema **)taosArrayGetLast(pTable->schema)));
TSDB_WLOCK_TABLE(pCTable);
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
pCTable->schema[pCTable->numOfSchemas++] = pSchema;
} else {
ASSERT(pCTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
tdFreeSchema(pCTable->schema[0]);
memmove(pCTable->schema, pCTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
pCTable->schema[pCTable->numOfSchemas - 1] = pSchema;
}
tsdbAddSchema(pCTable, pSchema);
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
......@@ -829,9 +824,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
TABLE_TID(pTable) = -1;
TABLE_SUID(pTable) = -1;
pTable->pSuper = NULL;
pTable->numOfSchemas = 1;
pTable->schema[0] = tdDupSchema(pCfg->schema);
if (pTable->schema[0] == NULL) {
if (tsdbAddSchema(pTable, tdDupSchema(pCfg->schema)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
......@@ -842,7 +835,8 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
}
pTable->tagVal = NULL;
STColumn *pCol = schemaColAt(pTable->tagSchema, DEFAULT_TAG_INDEX_COLUMN);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL, SL_ALLOW_DUP_KEY, getTagIndexKey);
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, colType(pCol), (uint8_t)(colBytes(pCol)), NULL,
SL_ALLOW_DUP_KEY, getTagIndexKey);
if (pTable->pIndex == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......@@ -871,9 +865,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pST
}
} else {
TABLE_SUID(pTable) = -1;
pTable->numOfSchemas = 1;
pTable->schema[0] = tdDupSchema(pCfg->schema);
if (pTable->schema[0] == NULL) {
if (tsdbAddSchema(pTable, tdDupSchema(pCfg->schema)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
......@@ -907,9 +899,7 @@ static void tsdbFreeTable(STable *pTable) {
TABLE_UID(pTable));
tfree(TABLE_NAME(pTable));
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) {
tdFreeSchema(pTable->schema[i]);
}
tsdbFreeTableSchema(pTable);
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tdFreeSchema(pTable->tagSchema);
......@@ -1261,9 +1251,10 @@ static int tsdbEncodeTable(void **buf, STable *pTable) {
tlen += taosEncodeFixedU64(buf, TABLE_SUID(pTable));
tlen += tdEncodeKVRow(buf, pTable->tagVal);
} else {
tlen += taosEncodeFixedU8(buf, pTable->numOfSchemas);
for (int i = 0; i < pTable->numOfSchemas; i++) {
tlen += tdEncodeSchema(buf, pTable->schema[i]);
tlen += taosEncodeFixedU8(buf, taosArrayGetSize(pTable->schema));
for (int i = 0; i < taosArrayGetSize(pTable->schema); i++) {
STSchema *pSchema = taosArrayGetP(pTable->schema, i);
tlen += tdEncodeSchema(buf, pSchema);
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
......@@ -1294,9 +1285,12 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
buf = taosDecodeFixedU64(buf, &TABLE_SUID(pTable));
buf = tdDecodeKVRow(buf, &(pTable->tagVal));
} else {
buf = taosDecodeFixedU8(buf, &(pTable->numOfSchemas));
for (int i = 0; i < pTable->numOfSchemas; i++) {
buf = tdDecodeSchema(buf, &(pTable->schema[i]));
uint8_t nSchemas;
buf = taosDecodeFixedU8(buf, &nSchemas);
for (int i = 0; i < nSchemas; i++) {
STSchema *pSchema;
buf = tdDecodeSchema(buf, &pSchema);
tsdbAddSchema(pTable, pSchema);
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
......@@ -1458,3 +1452,36 @@ static int tsdbCheckTableTagVal(SKVRow *pKVRow, STSchema *pSchema) {
return 0;
}
static int tsdbAddSchema(STable *pTable, STSchema *pSchema) {
ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);
if (pTable->schema == NULL) {
pTable->schema = taosArrayInit(TSDB_MAX_TABLE_SCHEMAS, sizeof(SSchema *));
if (pTable->schema == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
}
ASSERT(taosArrayGetSize(pTable->schema) == 0 ||
schemaVersion(pSchema) > schemaVersion(*(STSchema **)taosArrayGetLast(pTable->schema)));
if (taosArrayPush(pTable->schema, &pSchema) == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static void tsdbFreeTableSchema(STable *pTable) {
ASSERT(pTable != NULL);
if (pTable->schema) {
for (size_t i = 0; i < taosArrayGetSize(pTable->schema); i++) {
STSchema *pSchema = taosArrayGetP(pTable->schema, i);
tdFreeSchema(pSchema);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册