未验证 提交 365843ec 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #7947 from taosdata/hotfix/TD-6636-M

<TD-6636><hotfix>: row convert fix and schema version fetch optimize
......@@ -580,7 +580,7 @@ static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSc
SKVRow kvRow = memRowKvBody(dest);
memRowSetType(dest, SMEM_ROW_KV);
memRowSetKvVersion(kvRow, dataRowVersion(dataRow));
memRowSetKvVersion(dest, dataRowVersion(dataRow));
kvRowSetNCols(kvRow, nBoundCols);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
......
......@@ -74,7 +74,7 @@ void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdbRepo* pRepo);
int tsdbCloseMeta(STsdbRepo* pRepo);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version, int8_t rowType);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
......@@ -99,7 +99,9 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k
}
}
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) {
// set rowType to -1 at default if have no relationship with row
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version,
int8_t rowType) {
STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
......@@ -110,8 +112,12 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
} else { // get the schema with version
void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
if (rowType == SMEM_ROW_KV) {
ptr = taosArrayGetLast(pDTable->schema);
} else {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
}
}
pTSchema = *(STSchema**)ptr;
}
......@@ -130,7 +136,7 @@ _exit:
}
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
return tsdbGetTableSchemaImpl(pTable, false, false, -1);
return tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
}
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
......
......@@ -866,7 +866,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
}
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
pCommith->pTable = pTable;
......@@ -1283,7 +1283,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
(*iter)++;
} else if (key1 > key2) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
pSchema =
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
ASSERT(pSchema != NULL);
}
......@@ -1304,7 +1305,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (update != TD_ROW_DISCARD_UPDATE) {
//copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
pSchema =
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
ASSERT(pSchema != NULL);
}
......
......@@ -431,7 +431,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1);
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1, -1);
taosArrayClear(pComph->aSupBlk);
if ((tdInitDataCols(pComph->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
......
......@@ -617,7 +617,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1), 0);
tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 0);
}
}
}
......
......@@ -582,7 +582,7 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row));
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
if (*ppSchema == NULL) {
ASSERT(false);
return -1;
......@@ -730,7 +730,7 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
if(pSchema2 != NULL && schemaVersion(pSchema2) == dv1) {
*ppSchema1 = pSchema2;
} else {
*ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1));
*ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1), (int8_t)memRowType(row1));
}
pSchema1 = *ppSchema1;
}
......@@ -739,7 +739,7 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
if(schemaVersion(pSchema1) == dv2) {
pSchema2 = pSchema1;
} else {
*ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2));
*ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2), (int8_t)memRowType(row2));
pSchema2 = *ppSchema2;
}
}
......@@ -847,7 +847,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
}
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion, -1);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
......@@ -894,7 +894,7 @@ static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) {
......@@ -951,7 +951,7 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
}
} else {
ASSERT(pBlock->sversion >= 0);
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion, -1) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
......@@ -972,7 +972,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
return;
}
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row), (int8_t)memRowType(row));
if (pSchema == NULL) {
return;
}
......
......@@ -534,8 +534,8 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
return *(STable **)ptr;
}
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t _version) {
return tsdbGetTableSchemaImpl(pTable, true, false, _version);
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t _version, int8_t rowType) {
return tsdbGetTableSchemaImpl(pTable, true, false, _version, rowType);
}
int tsdbWLockRepoMeta(STsdbRepo *pRepo) {
......@@ -652,7 +652,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
}
STSchema* tsdbGetTableLatestSchema(STable *pTable) {
return tsdbGetTableSchemaByVersion(pTable, -1);
return tsdbGetTableSchemaByVersion(pTable, -1, -1);
}
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
......@@ -957,7 +957,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
}
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
}
......@@ -965,7 +965,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1), 1);
tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 1);
}
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
......@@ -984,7 +984,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
SListNode *pNode = NULL;
STable * tTable = NULL;
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
int maxCols = schemaNCols(pSchema);
int maxRowBytes = schemaTLen(pSchema);
......@@ -1018,7 +1018,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
for (int i = 0; i < pMeta->maxTables; i++) {
STable *_pTable = pMeta->tables[i];
if (_pTable != NULL) {
pSchema = tsdbGetTableSchemaImpl(_pTable, false, false, -1);
pSchema = tsdbGetTableSchemaImpl(_pTable, false, false, -1, -1);
maxCols = MAX(maxCols, schemaNCols(pSchema));
maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema));
}
......
......@@ -1582,7 +1582,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
int32_t numOfColsOfRow1 = 0;
if (pSchema1 == NULL) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1));
}
if(isRow1DataRow) {
numOfColsOfRow1 = schemaNCols(pSchema1);
......@@ -1594,7 +1594,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
if(row2) {
isRow2DataRow = isDataRow(row2);
if (pSchema2 == NULL) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2), (int8_t)memRowType(row2));
}
if(isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
......@@ -1961,11 +1961,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (rv1 != memRowVersion(row1)) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1));
rv1 = memRowVersion(row1);
}
if(row2 && rv2 != memRowVersion(row2)) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2), (int8_t)memRowType(row2));
rv2 = memRowVersion(row2);
}
......@@ -1986,11 +1986,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, pos, pos);
}
if (rv1 != memRowVersion(row1)) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1));
rv1 = memRowVersion(row1);
}
if(row2 && rv2 != memRowVersion(row2)) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2), (int8_t)memRowType(row2));
rv2 = memRowVersion(row2);
}
......@@ -2654,7 +2654,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
win->ekey = key;
if (rv != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row), (int8_t)memRowType(row));
rv = memRowVersion(row);
}
mergeTwoRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pTable, pSchema, NULL, true);
......
......@@ -153,7 +153,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
}
int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
pReadh->pTable = pTable;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册