提交 932116e4 编写于 作者: X Xiaoyu Wang

merge refact/submit_req

...@@ -87,7 +87,8 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData); ...@@ -87,7 +87,8 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow); int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow);
void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow); void tRowDestroy(SRow *pRow);
int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag); void tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
// SRowIter ================================ // SRowIter ================================
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
......
...@@ -527,7 +527,7 @@ static int32_t tRowPCmprFn(const void *p1, const void *p2) { ...@@ -527,7 +527,7 @@ static int32_t tRowPCmprFn(const void *p1, const void *p2) {
return 0; return 0;
} }
static void tRowPDestroy(SRow **ppRow) { tRowDestroy(*ppRow); } static void tRowPDestroy(SRow **ppRow) { tRowDestroy(*ppRow); }
static int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) { static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) {
int32_t code = 0; int32_t code = 0;
int32_t nRow = iEnd - iStart; int32_t nRow = iEnd - iStart;
...@@ -591,12 +591,14 @@ _exit: ...@@ -591,12 +591,14 @@ _exit:
if (code) tRowDestroy(pRow); if (code) tRowDestroy(pRow);
return code; return code;
} }
int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
if (aRowP->size <= 1) return 0;
int32_t code = 0;
void tRowSort(SArray *aRowP) {
if (TARRAY_SIZE(aRowP) <= 1) return;
taosArraySort(aRowP, tRowPCmprFn); taosArraySort(aRowP, tRowPCmprFn);
}
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
int32_t code = 0;
int32_t iStart = 0; int32_t iStart = 0;
while (iStart < aRowP->size) { while (iStart < aRowP->size) {
...@@ -612,7 +614,7 @@ int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag) { ...@@ -612,7 +614,7 @@ int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
} }
if (iEnd - iStart > 1) { if (iEnd - iStart > 1) {
code = tRowMerge(aRowP, pTSchema, iStart, iEnd, flag); code = tRowMergeImpl(aRowP, pTSchema, iStart, iEnd, flag);
} }
// the array is also changing, so the iStart just ++ instead of iEnd // the array is also changing, so the iStart just ++ instead of iEnd
...@@ -2186,7 +2188,6 @@ int32_t tGetColData(uint8_t *pBuf, SColData *pColData) { ...@@ -2186,7 +2188,6 @@ int32_t tGetColData(uint8_t *pBuf, SColData *pColData) {
n += pColData->nData; n += pColData->nData;
} else { } else {
pColData->nData = TYPE_BYTES[pColData->type] * pColData->nVal; pColData->nData = TYPE_BYTES[pColData->type] * pColData->nVal;
pColData->nData = pBuf + n;
n += pColData->nData; n += pColData->nData;
} }
} }
......
...@@ -115,13 +115,13 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2); ...@@ -115,13 +115,13 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2);
void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
SColVal *tsdbRowIterNext(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger // SRowMerger
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tRowMergerClear(SRowMerger *pMerger); void tsdbRowMergerClear(SRowMerger *pMerger);
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow); int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
// TABLEID // TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY // TSDBKEY
......
...@@ -261,12 +261,12 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, SR ...@@ -261,12 +261,12 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, SR
SRowMerger merger = {0}; SRowMerger merger = {0};
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
tRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema); tsdbRowMergerInit(&merger, &tsdbRowFromTSRow(0, cacheRow), pTSchema);
tRowMerge(&merger, &tsdbRowFromTSRow(1, row)); tsdbRowMerge(&merger, &tsdbRowFromTSRow(1, row));
tRowMergerGetRow(&merger, &mergedRow); tsdbRowMergerGetRow(&merger, &mergedRow);
tRowMergerClear(&merger); tsdbRowMergerClear(&merger);
taosMemoryFreeClear(pTSchema); taosMemoryFreeClear(pTSchema);
......
...@@ -1777,7 +1777,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1777,7 +1777,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->order == TSDB_ORDER_ASC) { if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) { if (minKey == key) {
init = true; init = true;
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1787,10 +1787,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1787,10 +1787,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1800,11 +1800,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1800,11 +1800,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) { if (minKey == k.ts) {
if (init) { if (init) {
tRowMerge(&merge, pRow); tsdbRowMerge(&merge, pRow);
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, pRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1818,7 +1818,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1818,7 +1818,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, pRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1832,10 +1832,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1832,10 +1832,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1845,10 +1845,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1845,10 +1845,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
if (init) { if (init) {
tRowMerge(&merge, &fRow); tsdbRowMerge(&merge, &fRow);
} else { } else {
init = true; init = true;
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1857,7 +1857,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1857,7 +1857,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
int32_t code = tRowMergerGetRow(&merge, &pTSRow); int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1865,7 +1865,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1865,7 +1865,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tsdbRowMergerClear(&merge);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1886,16 +1886,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1886,16 +1886,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock; pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
code = tRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1903,10 +1903,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1903,10 +1903,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tsdbRowMergerClear(&merge);
} }
} else { // not merge block data } else { // not merge block data
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1919,7 +1919,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1919,7 +1919,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
} }
code = tRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1927,7 +1927,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1927,7 +1927,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tsdbRowMergerClear(&merge);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1955,7 +1955,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1955,7 +1955,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1963,11 +1963,11 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1963,11 +1963,11 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
code = tRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1975,7 +1975,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1975,7 +1975,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tsdbRowMergerClear(&merge);
return code; return code;
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -2056,7 +2056,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2056,7 +2056,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
init = true; init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tRowMergerInit(&merge, &fRow, pReader->pSchema); code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2067,10 +2067,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2067,10 +2067,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2081,7 +2081,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2081,7 +2081,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == ik.ts) { if (minKey == ik.ts) {
if (init) { if (init) {
tRowMerge(&merge, piRow); tsdbRowMerge(&merge, piRow);
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
...@@ -2089,7 +2089,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2089,7 +2089,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
code = tRowMergerInit(&merge, piRow, pSchema); code = tsdbRowMergerInit(&merge, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2108,10 +2108,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2108,10 +2108,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
tRowMerge(&merge, pRow); tsdbRowMerge(&merge, pRow);
} else { } else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tRowMergerInit(&merge, pRow, pSchema); code = tsdbRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2126,7 +2126,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2126,7 +2126,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tRowMergerInit(&merge, pRow, pSchema); code = tsdbRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2140,11 +2140,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2140,11 +2140,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == ik.ts) { if (minKey == ik.ts) {
if (init) { if (init) {
tRowMerge(&merge, piRow); tsdbRowMerge(&merge, piRow);
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
code = tRowMergerInit(&merge, piRow, pSchema); code = tsdbRowMergerInit(&merge, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2159,10 +2159,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2159,10 +2159,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) { if (init) {
tRowMerge(&merge, &fRow1); tsdbRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2173,7 +2173,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2173,7 +2173,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) { if (!init) {
code = tRowMergerInit(&merge, &fRow, pReader->pSchema); code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2181,7 +2181,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2181,7 +2181,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (merge.pTSchema == NULL) { if (merge.pTSchema == NULL) {
return code; return code;
} }
tRowMerge(&merge, &fRow); tsdbRowMerge(&merge, &fRow);
} }
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
} }
...@@ -2191,7 +2191,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2191,7 +2191,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
code = tRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2199,7 +2199,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2199,7 +2199,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tsdbRowMergerClear(&merge);
return code; return code;
} }
...@@ -2356,13 +2356,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc ...@@ -2356,13 +2356,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
code = tRowMergerGetRow(&merge, &pTSRow); code = tsdbRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2370,7 +2370,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc ...@@ -2370,7 +2370,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tsdbRowMergerClear(&merge);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -3232,7 +3232,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe ...@@ -3232,7 +3232,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
return terrno; return terrno;
} }
tRowMergerAdd(pMerger, pRow, pTSchema); tsdbRowMergerAdd(pMerger, pRow, pTSchema);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3247,7 +3247,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd ...@@ -3247,7 +3247,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd
} }
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow); tsdbRowMerge(pMerger, &fRow);
rowIndex += step; rowIndex += step;
} }
...@@ -3344,7 +3344,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc ...@@ -3344,7 +3344,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(pMerger, &fRow1); tsdbRowMerge(pMerger, &fRow1);
} else { } else {
break; break;
} }
...@@ -3394,7 +3394,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3394,7 +3394,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
pReader->pSchema = pTSchema; pReader->pSchema = pTSchema;
} }
int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema); int32_t code = tsdbRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3404,19 +3404,19 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3404,19 +3404,19 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno; return terrno;
} }
tRowMergerAdd(&merge, pNextRow, pTSchema1); tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = tRowMergerGetRow(&merge, pTSRow); code = tsdbRowMergerGetRow(&merge, pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
tRowMergerClear(&merge); tsdbRowMergerClear(&merge);
*freeTSRow = true; *freeTSRow = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3431,7 +3431,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3431,7 +3431,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, piRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3442,7 +3442,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3442,7 +3442,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
tRowMerge(&merge, pRow); tsdbRowMerge(&merge, pRow);
code = code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -3452,7 +3452,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3452,7 +3452,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
} else { } else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, pRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
return code; return code;
} }
...@@ -3463,7 +3463,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3463,7 +3463,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
tRowMerge(&merge, piRow); tsdbRowMerge(&merge, piRow);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader); pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -3471,7 +3471,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3471,7 +3471,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
} }
} }
int32_t code = tRowMergerGetRow(&merge, pTSRow); int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
return code; return code;
} }
......
...@@ -642,7 +642,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { ...@@ -642,7 +642,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
// SRowMerger ====================================================== // SRowMerger ======================================================
int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
...@@ -697,7 +697,7 @@ _exit: ...@@ -697,7 +697,7 @@ _exit:
return code; return code;
} }
int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
...@@ -736,7 +736,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { ...@@ -736,7 +736,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
return code; return code;
} }
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
...@@ -775,9 +775,9 @@ _exit: ...@@ -775,9 +775,9 @@ _exit:
return code; return code;
} }
void tRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); } void tsdbRowMergerClear(SRowMerger *pMerger) { taosArrayDestroy(pMerger->pArray); }
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow) { int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
...@@ -807,7 +807,7 @@ _exit: ...@@ -807,7 +807,7 @@ _exit:
return code; return code;
} }
int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
} }
......
...@@ -1197,8 +1197,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { ...@@ -1197,8 +1197,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
while (TSDB_CODE_SUCCESS == code && NULL != p) { while (TSDB_CODE_SUCCESS == code && NULL != p) {
STableDataCxt* pTableCxt = *(STableDataCxt**)p; STableDataCxt* pTableCxt = *(STableDataCxt**)p;
if (pTableCxt->ordered) { if (pTableCxt->ordered) {
code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); tRowSort(pTableCxt->pData->aRowP);
} }
code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SVgroupDataCxt* pVgCxt = NULL; SVgroupDataCxt* pVgCxt = NULL;
int32_t vgId = pTableCxt->pMeta->vgId; int32_t vgId = pTableCxt->pMeta->vgId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册