提交 1ff8511e 编写于 作者: M Minglei Jin

tsdb/row merger: comment init out

上级 55f7e41f
...@@ -123,11 +123,11 @@ int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) ...@@ -123,11 +123,11 @@ int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema)
void tsdbRowClose(STSDBRowIter *pIter); void tsdbRowClose(STSDBRowIter *pIter);
SColVal *tsdbRowIterNext(STSDBRowIter *pIter); SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger // SRowMerger
int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); // int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowMergerClear(SRowMerger *pMerger); void tsdbRowMergerClear(SRowMerger *pMerger);
// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow); // int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow); int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
// TABLEID // TABLEID
......
...@@ -1969,7 +1969,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1969,7 +1969,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->order == TSDB_ORDER_ASC) { if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) { if (minKey == key) {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1982,7 +1982,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1982,7 +1982,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1999,7 +1999,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1999,7 +1999,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, pRow, pSchema); tsdbRowMergerAdd(&merge, pRow, pSchema);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2013,7 +2013,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2013,7 +2013,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2030,7 +2030,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2030,7 +2030,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2043,7 +2043,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2043,7 +2043,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow, NULL); tsdbRowMergerAdd(&merge, &fRow, NULL);
} else { } else {
init = true; init = true;
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2088,7 +2088,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2088,7 +2088,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock; pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2112,7 +2112,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2112,7 +2112,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
} }
} }
} else { // not merge block data } else { // not merge block data
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2164,7 +2164,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -2164,7 +2164,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2263,7 +2263,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2263,7 +2263,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
init = true; init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2277,7 +2277,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2277,7 +2277,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2296,7 +2296,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2296,7 +2296,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
code = tsdbRowMergerInit(&merge, piRow, pSchema); code = tsdbRowMergerInit(&merge, NULL, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2318,7 +2318,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2318,7 +2318,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, pRow, NULL); tsdbRowMergerAdd(&merge, pRow, NULL);
} else { } else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, pRow, pSchema); code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2333,7 +2333,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2333,7 +2333,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, pRow, pSchema); code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2351,7 +2351,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2351,7 +2351,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, piRow, pSchema); code = tsdbRowMergerInit(&merge, NULL, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2369,7 +2369,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2369,7 +2369,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(&merge, &fRow1, NULL); tsdbRowMergerAdd(&merge, &fRow1, NULL);
} else { } else {
init = true; init = true;
code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema); code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2380,7 +2380,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2380,7 +2380,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) { if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) { if (!init) {
code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2573,7 +2573,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc ...@@ -2573,7 +2573,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3844,7 +3844,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3844,7 +3844,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
pReader->pSchema = pTSchema; pReader->pSchema = pTSchema;
} }
code = tsdbRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema); code = tsdbRowMergerInit(&merge, pReader->pSchema, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3856,7 +3856,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3856,7 +3856,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
tsdbRowMergerAdd(&merge, pNextRow, pTSchema1); tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
} else { // let's merge rows in file block } else { // let's merge rows in file block
code = tsdbRowMergerInit(&merge, &current, pReader->pSchema); code = tsdbRowMergerInit(&merge, NULL, &current, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3892,7 +3892,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3892,7 +3892,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema); int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -3913,7 +3913,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3913,7 +3913,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
} else { } else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
return code; return code;
} }
......
...@@ -638,13 +638,17 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { ...@@ -638,13 +638,17 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
// SRowMerger ====================================================== // SRowMerger ======================================================
int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
STColumn *pTColumn; STColumn *pTColumn;
int32_t iCol, jCol = 0; int32_t iCol, jCol = 0;
if (NULL == pResTSchema) {
pResTSchema = pTSchema;
}
pMerger->pTSchema = pResTSchema; pMerger->pTSchema = pResTSchema;
pMerger->version = key.version; pMerger->version = key.version;
...@@ -774,7 +778,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) ...@@ -774,7 +778,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
pMerger->version = key.version; pMerger->version = key.version;
return code; return code;
} }
/*
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
...@@ -825,7 +829,7 @@ int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema ...@@ -825,7 +829,7 @@ int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema
_exit: _exit:
return code; return code;
} }
*/
void tsdbRowMergerClear(SRowMerger *pMerger) { void tsdbRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) { for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册