提交 8cf8ac84 编写于 作者: M Minglei Jin

fix(tsdb/read): use correct scheme for mem & imem merging

上级 f1ff5dce
...@@ -177,15 +177,15 @@ struct STsdbReader { ...@@ -177,15 +177,15 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo; SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap; STsdbReadSnap* pReadSnap;
SIOCostSummary cost; SIOCostSummary cost;
STSchema* pSchema; // the newest version schema STSchema* pSchema; // the newest version schema
// STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times // STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index; SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf; SBlockInfoBuf blockInfoBuf;
int32_t step; int32_t step;
STsdbReader* innerReader[2]; STsdbReader* innerReader[2];
}; };
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
...@@ -1862,11 +1862,11 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* ...@@ -1862,11 +1862,11 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion)); void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion));
if (p != NULL) { if (p != NULL) {
return *(STSchema**) p; return *(STSchema**)p;
} }
STSchema* ptr = NULL; STSchema* ptr = NULL;
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return NULL; return NULL;
...@@ -3674,8 +3674,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3674,8 +3674,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema); int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -4000,7 +4001,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { ...@@ -4000,7 +4001,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
} }
static void freeSchemaFunc(void* param) { static void freeSchemaFunc(void* param) {
void* p = *(void**) param; void* p = *(void**)param;
taosMemoryFree(p); taosMemoryFree(p);
} }
...@@ -4082,7 +4083,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL ...@@ -4082,7 +4083,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
if (pReader->pSchemaMap == NULL) { if (pReader->pSchemaMap == NULL) {
tsdbError("failed init schema hash for reader", pReader->idStr); tsdbError("failed init schema hash for reader %s", pReader->idStr);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -4710,7 +4711,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -4710,7 +4711,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
pReader->order = pCond->order; pReader->order = pCond->order;
...@@ -4731,9 +4732,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -4731,9 +4732,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
resetDataBlockIterator(pBlockIter, pReader->order); resetDataBlockIterator(pBlockIter, pReader->order);
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc? 1:-1; int32_t step = asc ? 1 : -1;
int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1; int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
int32_t code = 0; int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册