提交 2d335332 编写于 作者: C Cary Xu

revert

上级 46f61ddd
......@@ -74,7 +74,7 @@ void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdbRepo* pRepo);
int tsdbCloseMeta(STsdbRepo* pRepo);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version, int8_t rowType);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
......@@ -100,8 +100,7 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k
}
// set rowType to -1 at default if have no relationship with row
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version,
int8_t rowType) {
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) {
STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
......@@ -112,12 +111,8 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
} else { // get the schema with version
void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) {
if (rowType == SMEM_ROW_KV) {
ptr = taosArrayGetLast(pDTable->schema);
} else {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
}
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
}
pTSchema = *(STSchema**)ptr;
}
......@@ -136,7 +131,7 @@ _exit:
}
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
return tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
return tsdbGetTableSchemaImpl(pTable, false, false, -1);
}
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
......
......@@ -1021,7 +1021,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
}
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pCommith->pTable = pTable;
......@@ -1439,7 +1439,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
} else if (key1 > key2) {
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema =
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
......@@ -1461,7 +1461,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
//copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema =
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL);
}
......
......@@ -431,7 +431,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1, -1);
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1);
taosArrayClear(pComph->aSupBlk);
if ((tdInitDataCols(pComph->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
......
......@@ -624,7 +624,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 0);
tsdbGetTableSchemaImpl(pTable, false, false, -1), 0);
}
}
}
......
......@@ -587,7 +587,7 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row));
if (*ppSchema == NULL) {
ASSERT(false);
return -1;
......@@ -735,7 +735,7 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
if(pSchema2 != NULL && schemaVersion(pSchema2) == dv1) {
*ppSchema1 = pSchema2;
} else {
*ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1), (int8_t)memRowType(row1));
*ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1));
}
pSchema1 = *ppSchema1;
}
......@@ -744,7 +744,7 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
if(schemaVersion(pSchema1) == dv2) {
pSchema2 = pSchema1;
} else {
*ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2), (int8_t)memRowType(row2));
*ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2));
pSchema2 = *ppSchema2;
}
}
......@@ -852,7 +852,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
}
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion, -1);
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
......@@ -899,7 +899,7 @@ static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) {
......@@ -956,7 +956,7 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
}
} else {
ASSERT(pBlock->sversion >= 0);
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion, -1) == NULL) {
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
......@@ -977,7 +977,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
return;
}
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row), (int8_t)memRowType(row));
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
if (pSchema == NULL) {
return;
}
......
......@@ -545,8 +545,8 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
return *(STable **)ptr;
}
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t _version, int8_t rowType) {
return tsdbGetTableSchemaImpl(pTable, true, false, _version, rowType);
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t _version) {
return tsdbGetTableSchemaImpl(pTable, true, false, _version);
}
int tsdbWLockRepoMeta(STsdbRepo *pRepo) {
......@@ -664,7 +664,7 @@ int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
}
STSchema* tsdbGetTableLatestSchema(STable *pTable) {
return tsdbGetTableSchemaByVersion(pTable, -1, -1);
return tsdbGetTableSchemaByVersion(pTable, -1);
}
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema) {
......@@ -969,7 +969,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
}
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
}
......@@ -977,7 +977,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 1);
tsdbGetTableSchemaImpl(pTable, false, false, -1), 1);
}
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
......@@ -996,7 +996,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
SListNode *pNode = NULL;
STable * tTable = NULL;
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int maxCols = schemaNCols(pSchema);
int maxRowBytes = schemaTLen(pSchema);
......@@ -1030,7 +1030,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
for (int i = 0; i < pMeta->maxTables; i++) {
STable *_pTable = pMeta->tables[i];
if (_pTable != NULL) {
pSchema = tsdbGetTableSchemaImpl(_pTable, false, false, -1, -1);
pSchema = tsdbGetTableSchemaImpl(_pTable, false, false, -1);
maxCols = MAX(maxCols, schemaNCols(pSchema));
maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema));
}
......
......@@ -1570,7 +1570,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
int32_t numOfColsOfRow1 = 0;
if (pSchema1 == NULL) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
}
if(isRow1DataRow) {
numOfColsOfRow1 = schemaNCols(pSchema1);
......@@ -1582,7 +1582,7 @@ static void mergeTwoRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
if(row2) {
isRow2DataRow = isDataRow(row2);
if (pSchema2 == NULL) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2), (int8_t)memRowType(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
}
if(isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
......@@ -1949,11 +1949,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (rv1 != memRowVersion(row1)) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1 = memRowVersion(row1);
}
if(row2 && rv2 != memRowVersion(row2)) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2), (int8_t)memRowType(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = memRowVersion(row2);
}
......@@ -1974,11 +1974,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, pos, pos);
}
if (rv1 != memRowVersion(row1)) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1));
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1 = memRowVersion(row1);
}
if(row2 && rv2 != memRowVersion(row2)) {
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2), (int8_t)memRowType(row2));
pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = memRowVersion(row2);
}
......@@ -2642,7 +2642,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
win->ekey = key;
if (rv != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row), (int8_t)memRowType(row));
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
rv = memRowVersion(row);
}
mergeTwoRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pTable, pSchema, NULL, true);
......
......@@ -153,7 +153,7 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
}
int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
pReadh->pTable = pTable;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册