提交 c6daa68e 编写于 作者: H Haojun Liao

enh(query): opt merge life cycle.

上级 7360a6b6
......@@ -126,11 +126,13 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
// int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowMergerClear(SRowMerger *pMerger);
// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema);
void tsdbRowMergerClear_rv(SRowMerger* pMerger);
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger);
// TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY
......
......@@ -157,6 +157,7 @@ typedef struct SReaderStatus {
SFilesetIter fileIter;
SDataBlockIter blockIter;
SLDataIter* pLDataIter;
SRowMerger merger;
} SReaderStatus;
typedef struct SBlockInfoBuf {
......@@ -166,6 +167,15 @@ typedef struct SBlockInfoBuf {
int32_t numOfTables;
} SBlockInfoBuf;
typedef struct STsdbReaderAttr {
STSchema* pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
bool freeBlock;
SVersionRange verRange;
} STsdbReaderAttr;
struct STsdbReader {
STsdb* pTsdb;
SVersionRange verRange;
......@@ -185,28 +195,26 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
STSchema* pSchema; // the newest version schema
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
STSchema* pSchema; // the newest version schema
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
STableBlockScanInfo* pInfo);
STableBlockScanInfo* pScanInfo);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
......@@ -214,7 +222,7 @@ static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY*
SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
......@@ -235,7 +243,7 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
......@@ -1889,7 +1897,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return code;
}
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
if (pReader->pSchema != NULL) {
return pReader->pSchema;
}
......@@ -1912,6 +1920,12 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
terrno = code;
return NULL;
}
code = tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
if (code != 0) {
terrno = code;
return NULL;
}
}
if (pReader->pSchema && sversion == pReader->pSchema->version) {
......@@ -1989,11 +2003,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true; // todo check if pReader->pSchema is null or not
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
if (minKey == tsLast) {
......@@ -2002,7 +2016,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2019,12 +2033,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, pRow, pSchema);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2037,12 +2051,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno;
}
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
return code;
}
......@@ -2054,7 +2068,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2067,12 +2081,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
}
......@@ -2112,7 +2126,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2136,7 +2150,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
}
} else { // not merge block data
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2145,7 +2159,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
code = tsdbRowMergerGetRow(&merge, &pTSRow);
......@@ -2185,22 +2199,22 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
SRow* pTSRow = NULL;
SRowMerger merge = {0};
SRow* pTSRow = NULL;
SRowMerger* pMerger = &pReader->status.merger;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(&merge, &fRow1, NULL);
tsdbRowMergerAdd(pMerger, &fRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2208,7 +2222,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(pMerger);
return code;
} else {
return TSDB_CODE_SUCCESS;
......@@ -2296,12 +2310,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
if (minKey == tsLast) {
......@@ -2310,7 +2324,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else {
init = true;
code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2324,14 +2338,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, piRow, piSchema);
} else {
init = true;
code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2346,13 +2359,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, pRow, pSchema);
} else {
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2360,13 +2372,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else {
if (minKey == k.ts) {
init = true;
code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2377,14 +2388,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, piRow, piSchema);
} else {
init = true;
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2396,7 +2405,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else {
init = true;
code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2407,7 +2416,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) {
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2417,7 +2426,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
tsdbRowMergerAdd(&merge, &fRow, NULL);
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
}
......@@ -2598,15 +2607,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SRow* pTSRow = NULL;
SRowMerger merge = {0};
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
code = tsdbRowMergerGetRow(&merge, &pTSRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
code = tsdbRowMergerGetRow(&pReader->status.merger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2614,7 +2621,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(&pReader->status.merger);
return code;
}
}
......@@ -3685,8 +3692,9 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
}
}
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader) {
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader) {
SRowMerger* pMerger = &pReader->status.merger;
while (1) {
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
......@@ -3765,10 +3773,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return code;
}
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger) {
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
......@@ -3847,7 +3855,6 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
}
}
SRowMerger merge = {0};
terrno = 0;
int32_t code = 0;
......@@ -3859,8 +3866,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno;
}
STSchema* ps = (pReader->pSchema != NULL)? pReader->pSchema:pTSchema;
code = tsdbRowMergerInit(&merge, ps, &current, pTSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3870,28 +3876,28 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno;
}
tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1);
} else { // let's merge rows in file block
code = tsdbRowMergerInit(&merge, NULL, &current, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&merge, pNextRow, NULL);
tsdbRowMergerAdd(&pReader->status.merger,pNextRow, NULL);
}
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
code = tsdbRowMergerGetRow(&pReader->status.merger, &pResRow->pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResRow->type = TSDBROW_ROW_FMT;
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(&pReader->status.merger);
*freeTSRow = true;
return TSDB_CODE_SUCCESS;
......@@ -3899,7 +3905,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
SRow** pTSRow) {
SRowMerger merge = {0};
SRowMerger* pMerger = &pReader->status.merger;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
......@@ -3914,46 +3920,43 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
}
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&merge, pRow, pSchema);
tsdbRowMergerAdd(&pReader->status.merger,pRow, pSchema);
code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
return code;
}
code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&merge, piRow, piSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
tsdbRowMergerClear(&merge);
int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow);
tsdbRowMergerClear_rv(pMerger);
return code;
}
......@@ -4334,6 +4337,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
}
if (pReader->pSchema != NULL) {
tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
}
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
if (pReader->pSchemaMap == NULL) {
tsdbError("failed init schema hash for reader %s", pReader->idStr);
......@@ -4483,6 +4490,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
pCost->initDelSkylineIterTime, pReader->idStr);
taosMemoryFree(pReader->idStr);
tsdbRowMergerCleanup_rv(&pReader->status.merger);
taosMemoryFree(pReader->pSchema);
tSimpleHashCleanup(pReader->pSchemaMap);
......
......@@ -778,58 +778,40 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
pMerger->version = key.version;
return code;
}
/*
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
STColumn *pTColumn;
pMerger->pTSchema = pTSchema;
pMerger->version = key.version;
pMerger->pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) {
pMerger->pTSchema = pSchema;
pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
if (pMerger->pArray == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
}
// ts
pTColumn = &pTSchema->columns[0];
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
void tsdbRowMergerClear_rv(SRowMerger* pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
tFree(pTColVal->value.pData);
}
}
// other
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
taosArrayClear(pMerger->pArray);
}
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) {
int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
tFree(pTColVal->value.pData);
}
}
_exit:
return code;
taosArrayDestroy(pMerger->pArray);
}
*/
void tsdbRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册