提交 30a64bdc 编写于 作者: H Haojun Liao

fix(query): set correct schema.

上级 0aef734a
......@@ -14,6 +14,7 @@
*/
#include "tsdb.h"
#include "osDef.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
typedef enum {
......@@ -129,7 +130,8 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
STSchema* pSchema;
STSchema* pSchema;// the newest version schema
STSchema* pMemSchema;// the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader;
SVersionRange verRange;
......@@ -145,11 +147,10 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI
SRowMerger* pMerger);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
......@@ -1230,6 +1231,31 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return false;
}
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
// always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1);
}
if (sversion == pReader->pSchema->version) {
return pReader->pSchema;
}
if (pReader->pMemSchema == NULL) {
int32_t code =
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
return pReader->pMemSchema;
}
if (pReader->pMemSchema->version == sversion) {
return pReader->pMemSchema;
}
taosMemoryFree(pReader->pMemSchema);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
return pReader->pMemSchema;
}
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key) {
SRowMerger merge = {0};
......@@ -1241,6 +1267,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SArray* pDelList = pBlockScanInfo->delSkyline;
bool freeTSRow = false;
uint64_t uid = pBlockScanInfo->uid;
// ascending order traverse
if (ASCENDING_TRAVERSE(pReader->order)) {
......@@ -1276,9 +1303,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMergerGetRow(&merge, &pTSRow);
}
} else { // descending order: mem rows -----> imem rows ------> file block
updateSchema(pRow, pBlockScanInfo->uid, pReader);
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pReader->pSchema);
tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMerge(&merge, &fRow);
......@@ -1289,7 +1316,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
tRowMergerClear(&merge);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS;
......@@ -1333,7 +1360,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
return TSDB_CODE_SUCCESS;
} else { // key > ik.ts || key > k.ts
ASSERT(key != ik.ts);
......@@ -1342,7 +1369,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) {
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
......@@ -1353,7 +1380,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) {
doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
......@@ -1365,7 +1392,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
ASSERT(key > ik.ts && key > k.ts);
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS;
}
......@@ -1373,9 +1400,9 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else { // descending order scan
// [1/2] k.ts >= ik.ts && k.ts >= key
if (k.ts >= ik.ts && k.ts >= key) {
updateSchema(pRow, uid, pReader);
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pReader->pSchema);
tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
if (ik.ts == k.ts) {
......@@ -1390,7 +1417,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
return TSDB_CODE_SUCCESS;
} else {
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
......@@ -1399,7 +1426,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [4] ik.ts > key >= k.ts
if (ik.ts > key) {
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
......@@ -1414,7 +1441,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS;
}
......@@ -1427,7 +1454,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS;
......@@ -1495,7 +1522,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
......@@ -2146,6 +2173,7 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea
}
}
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader) {
while (1) {
......@@ -2166,22 +2194,8 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
break;
}
int32_t sversion = TSDBROW_SVERSION(pRow);
STSchema* pTSchema = NULL;
if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
if (pReader->pSchema == NULL) {
pReader->pSchema = pTSchema;
}
} else {
pTSchema = pReader->pSchema;
}
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
tRowMergerAdd(pMerger, pRow, pTSchema);
if (pTSchema != pReader->pSchema) {
taosMemoryFree(pTSchema);
}
}
return TSDB_CODE_SUCCESS;
......@@ -2289,21 +2303,13 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
return TSDB_CODE_SUCCESS;
}
void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
int32_t sversion = TSDBROW_SVERSION(pRow);
if (pReader->pSchema == NULL) {
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
} else if (pReader->pSchema->version != sversion) {
taosMemoryFreeClear(pReader->pSchema);
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
}
}
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow) {
{ // if the timestamp of the next valid row has a different ts, return current row directly
TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow;
{ // if the timestamp of the next valid row has a different ts, return current row directly
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
......@@ -2311,14 +2317,14 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
*freeTSRow = false;
return;
} else { // has next point in mem/imem
TSDBROW* pNextRow = getValidRow(pIter, pDelList, pReader);
pNextRow = getValidRow(pIter, pDelList, pReader);
if (pNextRow == NULL) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
return;
}
if (TSDBROW_KEY(&current).ts != TSDBROW_KEY(pNextRow).ts) {
if (current.pTSRow->ts != pNextRow->pTSRow->ts) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
return;
......@@ -2327,30 +2333,20 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
}
SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow);
// get the correct schema for data in memory
int32_t sversion = TSDBROW_SVERSION(pRow);
STSchema* pTSchema = NULL;
if (pReader->pSchema == NULL || sversion != pReader->pSchema->version) {
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema);
if (pReader->pSchema == NULL) {
pReader->pSchema = pTSchema;
}
} else {
pTSchema = pReader->pSchema;
}
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
tRowMergerInit2(&merge, pReader->pSchema, pRow, pTSchema);
doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader);
tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
tRowMergerAdd(&merge, pNextRow, pTSchema1);
doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge);
*freeTSRow = true;
if (sversion != pReader->pSchema->version) {
taosMemoryFree(pTSchema);
}
}
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
......@@ -2361,17 +2357,17 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
TSDBKEY ik = TSDBROW_KEY(piRow);
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
updateSchema(piRow, pBlockScanInfo->uid, pReader);
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pReader->pSchema);
tRowMergerInit(&merge, piRow, pSchema);
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMerge(&merge, pRow);
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
} else {
updateSchema(pRow, pBlockScanInfo->uid, pReader);
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pReader->pSchema);
tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMerge(&merge, piRow);
......@@ -2432,12 +2428,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return TSDB_CODE_SUCCESS;
}
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid) {
int32_t numOfRows = pBlock->info.rows;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
STSchema* pSchema = pReader->pSchema;
STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid);
SColVal colVal = {0};
int32_t i = 0, j = 0;
......@@ -2453,7 +2449,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
col_id_t colId = pColInfoData->info.colId;
if (colId == pSchema->columns[j].colId) {
tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal);
tTSRowGetVal(pTSRow, pSchema, j, &colVal);
doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
i += 1;
j += 1;
......@@ -2529,7 +2525,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
break;
}
doAppendRowFromTSRow(pBlock, pReader, pTSRow);
doAppendRowFromTSRow(pBlock, pReader, pTSRow, pBlockScanInfo->uid);
if (freeTSRow) {
taosMemoryFree(pTSRow);
}
......@@ -2755,6 +2751,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
taosMemoryFree(pReader->pMemSchema);
taosMemoryFreeClear(pReader);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册