提交 a73957aa 编写于 作者: H Haojun Liao

fix(query): fix error in add row into merger.

上级 37d042a3
...@@ -1954,7 +1954,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* ...@@ -1954,7 +1954,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0}; SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
...@@ -2013,7 +2013,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2013,7 +2013,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(&pReader->status.merger, &fRow1, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema);
...@@ -2021,7 +2021,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2021,7 +2021,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &pReader->status.merger, &pReader->verRange, pReader->idStr);
} }
if (minKey == k.ts) { if (minKey == k.ts) {
...@@ -2030,7 +2030,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2030,7 +2030,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno; return terrno;
} }
if (init) { if (init) {
tsdbRowMergerAdd(&merge, pRow, pSchema); tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
...@@ -2057,7 +2057,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2057,7 +2057,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
return code; return code;
} }
} }
...@@ -2065,7 +2065,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2065,7 +2065,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(pMerger, &fRow1, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema);
...@@ -2073,12 +2073,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2073,12 +2073,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
} }
if (minKey == key) { if (minKey == key) {
if (init) { if (init) {
tsdbRowMergerAdd(&merge, &fRow, NULL); tsdbRowMergerAdd(pMerger, &fRow, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
...@@ -2090,7 +2090,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2090,7 +2090,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow); int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2098,7 +2098,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2098,7 +2098,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear_rv(pMerger);
return code; return code;
} }
...@@ -2106,12 +2106,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2106,12 +2106,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) { bool mergeBlockData) {
SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
bool copied = false; bool copied = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr); tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
...@@ -2132,10 +2132,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2132,10 +2132,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
} }
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(pMerger, &fRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2143,7 +2143,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2143,7 +2143,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear_rv(pMerger);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2155,14 +2155,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2155,14 +2155,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
// merge with block data if ts == key // merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
} }
code = tsdbRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2170,7 +2170,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2170,7 +2170,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear_rv(pMerger);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2237,7 +2237,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -2237,7 +2237,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
SLastBlockReader* pLastBlockReader) { SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0}; SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
...@@ -2310,7 +2310,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2310,7 +2310,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
init = true; init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2321,24 +2321,24 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2321,24 +2321,24 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(pMerger, &fRow1, NULL);
} else { } else {
init = true; init = true;
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); code = tsdbRowMergerAdd(pMerger, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
} }
if (minKey == ik.ts) { if (minKey == ik.ts) {
if (init) { if (init) {
tsdbRowMergerAdd(&merge, piRow, piSchema); tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else { } else {
init = true; init = true;
code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2352,14 +2352,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2352,14 +2352,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) { if (minKey == k.ts) {
if (init) { if (init) {
if (merge.pTSchema == NULL) { tsdbRowMergerAdd(pMerger, pRow, pSchema);
return code;
}
tsdbRowMergerAdd(&merge, pRow, pSchema);
} else { } else {
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); // STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2372,7 +2368,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2372,7 +2368,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else { } else {
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2385,10 +2381,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2385,10 +2381,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == ik.ts) { if (minKey == ik.ts) {
if (init) { if (init) {
tsdbRowMergerAdd(&merge, piRow, piSchema); tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else { } else {
init = true; init = true;
code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2402,39 +2398,32 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2402,39 +2398,32 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(pMerger, &fRow1, NULL);
} else { } else {
init = true; init = true;
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow1, pReader->pSchema); code = tsdbRowMergerAdd(pMerger, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
} }
if (minKey == key) { if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) { if (!init) {
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema); code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} else { } else {
if (merge.pTSchema == NULL) { tsdbRowMergerAdd(pMerger, &fRow, NULL);
return code;
}
tsdbRowMergerAdd(&merge, &fRow, NULL);
} }
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
} }
} }
if (merge.pTSchema == NULL) { code = tsdbRowMergerGetRow(pMerger, &pTSRow);
return code;
}
code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2442,7 +2431,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2442,7 +2431,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear_rv(pMerger);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册