提交 09628401 编写于 作者: H Hongze Cheng

refact code

上级 f2ff17da
......@@ -54,6 +54,7 @@ target_sources(
"src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbMergeTree.c"
"src/tsdb/tsdbDataIter.c"
# tq
"src/tq/tq.c"
......
......@@ -69,6 +69,8 @@ typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SBlkInfo SBlkInfo;
typedef struct STsdbDataIter2 STsdbDataIter2;
typedef struct STsdbFilterInfo STsdbFilterInfo;
#define TSDBROW_ROW_FMT ((int8_t)0x0)
#define TSDBROW_COL_FMT ((int8_t)0x1)
......@@ -279,6 +281,7 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData);
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData);
int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData);
// SDelFWriter
......@@ -310,6 +313,25 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
int32_t tDiskDataBuilderClear(SDiskDataBuilder *pBuilder);
int32_t tDiskDataAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId);
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, const SBlkInfo **ppBlkInfo);
// tsdbDataIter.c ==============================================================================================
#define TSDB_MEM_TABLE_DATA_ITER 0
#define TSDB_DATA_FILE_DATA_ITER 1
#define TSDB_STT_FILE_DATA_ITER 2
#define TSDB_TOMB_FILE_DATA_ITER 3
#define TSDB_FILTER_FLAG_BY_VERSION 0x1
#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2 *)(((char *)pNode) - offsetof(STsdbDataIter2, rbtn)))
/* open */
int32_t tsdbOpenDataFileDataIter(SDataFReader *pReader, STsdbDataIter2 **ppIter);
int32_t tsdbOpenSttFileDataIter(SDataFReader *pReader, int32_t iStt, STsdbDataIter2 **ppIter);
int32_t tsdbOpenTombFileDataIter(SDelFReader *pReader, STsdbDataIter2 **ppIter);
/* close */
void tsdbCloseDataIter2(STsdbDataIter2 *pIter);
/* cmpr */
int32_t tsdbDataIterCmprFn(const SRBTreeNode *pNode1, const SRBTreeNode *pNode2);
/* next */
int32_t tsdbDataIterNext2(STsdbDataIter2 *pIter, STsdbFilterInfo *pFilterInfo);
// structs =======================
struct STsdbFS {
......@@ -830,6 +852,62 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
typedef struct {
int64_t suid;
int64_t uid;
SDelData delData;
} SDelInfo;
struct STsdbDataIter2 {
STsdbDataIter2 *next;
SRBTreeNode rbtn;
int32_t type;
SRowInfo rowInfo;
SDelInfo delInfo;
union {
// TSDB_MEM_TABLE_DATA_ITER
struct {
SMemTable *pMemTable;
} mIter;
// TSDB_DATA_FILE_DATA_ITER
struct {
SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk;
SBlockData bData;
int32_t iBlockIdx;
int32_t iDataBlk;
int32_t iRow;
} dIter;
// TSDB_STT_FILE_DATA_ITER
struct {
SDataFReader *pReader;
int32_t iStt;
SArray *aSttBlk;
SBlockData bData;
int32_t iSttBlk;
int32_t iRow;
} sIter;
// TSDB_TOMB_FILE_DATA_ITER
struct {
SDelFReader *pReader;
SArray *aDelIdx;
SArray *aDelData;
int32_t iDelIdx;
int32_t iDelData;
} tIter;
};
};
struct STsdbFilterInfo {
int32_t flag;
int64_t sver;
int64_t ever;
};
#ifdef __cplusplus
}
#endif
......
......@@ -14,6 +14,7 @@
*/
#include "tsdb.h"
#if 0
#define TSDB_ITER_TYPE_MEM 0x0
#define TSDB_ITER_TYPE_DAT 0x1
......@@ -943,11 +944,13 @@ _exit:
tsdbCloseCompactor(pCompactor);
return code;
}
#endif
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
int32_t code = 0;
int32_t lino = 0;
#if 0
STsdbCompactor *pCompactor = &(STsdbCompactor){0};
// begin compact
......@@ -977,5 +980,6 @@ _exit:
tsdbCommitCompact(pCompactor);
}
tsdbEndCompact(pCompactor);
#endif
return code;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
/* open */
int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
// create handle
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_DATA_FILE_DATA_ITER;
pIter->dIter.pReader = pReader;
if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataCreate(&pIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iBlockIdx = 0;
pIter->dIter.iDataBlk = 0;
pIter->dIter.iRow = 0;
// read data
code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear;
_exit:
if (code) {
if (pIter) {
_clear:
tBlockDataDestroy(&pIter->dIter.bData);
taosArrayDestroy(pIter->dIter.aBlockIdx);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
// create handle
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_STT_FILE_DATA_ITER;
pIter->sIter.pReader = pReader;
pIter->sIter.iStt = iStt;
pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->sIter.aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataCreate(&pIter->sIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->sIter.iSttBlk = 0;
pIter->sIter.iRow = 0;
// read data
code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear;
_exit:
if (code) {
if (pIter) {
_clear:
taosArrayDestroy(pIter->sIter.aSttBlk);
tBlockDataDestroy(&pIter->sIter.bData);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_TOMB_FILE_DATA_ITER;
pIter->tIter.pReader = pReader;
if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear;
pIter->tIter.iDelIdx = 0;
pIter->tIter.iDelData = 0;
_exit:
if (code) {
if (pIter) {
_clear:
taosArrayDestroy(pIter->tIter.aDelIdx);
taosArrayDestroy(pIter->tIter.aDelData);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
/* close */
static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) {
tBlockDataDestroy(&pIter->dIter.bData);
tMapDataClear(&pIter->dIter.mDataBlk);
taosArrayDestroy(pIter->dIter.aBlockIdx);
taosMemoryFree(pIter);
}
static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) {
tBlockDataDestroy(&pIter->sIter.bData);
taosArrayDestroy(pIter->sIter.aSttBlk);
taosMemoryFree(pIter);
}
static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) {
taosArrayDestroy(pIter->tIter.aDelData);
taosArrayDestroy(pIter->tIter.aDelIdx);
taosMemoryFree(pIter);
}
void tsdbCloseDataIter2(STsdbDataIter2* pIter) {
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
ASSERT(0);
} else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
tsdbCloseDataFileDataIter(pIter);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
tsdbCloseSttFileDataIter(pIter);
} else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) {
tsdbCloseTombFileDataIter(pIter);
} else {
ASSERT(0);
}
}
/* cmpr */
int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1);
STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2);
return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}
/* seek */
/* iter next */
static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
while (pIter->dIter.iRow < pIter->dIter.bData.nRow) {
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver ||
pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) {
pIter->dIter.iRow++;
continue;
}
}
}
pIter->rowInfo.suid = pIter->dIter.bData.suid;
pIter->rowInfo.uid = pIter->dIter.bData.uid;
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
pIter->dIter.iRow++;
goto _exit;
}
for (;;) {
while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
SDataBlk dataBlk;
tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
// filter
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) {
pIter->dIter.iDataBlk++;
continue;
}
}
}
code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iDataBlk++;
pIter->dIter.iRow = 0;
break;
}
if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break;
for (;;) {
if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iBlockIdx++;
pIter->dIter.iDataBlk = 0;
break;
} else {
pIter->rowInfo = (SRowInfo){0};
goto _exit;
}
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
while (pIter->sIter.iRow < pIter->sIter.bData.nRow) {
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] ||
pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) {
pIter->sIter.iRow++;
continue;
}
}
}
pIter->rowInfo.suid = pIter->sIter.bData.suid;
pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
pIter->sIter.iRow++;
goto _exit;
}
for (;;) {
if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) {
pIter->sIter.iSttBlk++;
continue;
}
}
}
code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->sIter.iRow = 0;
pIter->sIter.iSttBlk++;
break;
} else {
pIter->rowInfo = (SRowInfo){0};
goto _exit;
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) {
SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData);
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) {
pIter->tIter.iDelData++;
continue;
}
}
}
pIter->delInfo.delData = *pDelData;
pIter->tIter.iDelData++;
goto _exit;
}
for (;;) {
if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) {
SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx);
code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->delInfo.suid = pDelIdx->suid;
pIter->delInfo.uid = pDelIdx->uid;
pIter->tIter.iDelData = 0;
pIter->tIter.iDelIdx++;
break;
} else {
pIter->delInfo = (SDelInfo){0};
goto _exit;
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
ASSERT(0);
return code;
} else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
return tsdbDataFileDataIterNext(pIter, pFilterInfo);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
return tsdbSttFileDataIterNext(pIter, pFilterInfo);
} else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) {
return tsdbTombFileDataIterNext(pIter, pFilterInfo);
} else {
ASSERT(0);
return code;
}
}
/* get */
......@@ -15,457 +15,10 @@
#include "tsdb.h"
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg);
extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg);
// STsdbDataIter2 ========================================
#define TSDB_MEM_TABLE_DATA_ITER 0
#define TSDB_DATA_FILE_DATA_ITER 1
#define TSDB_STT_FILE_DATA_ITER 2
#define TSDB_TOMB_FILE_DATA_ITER 3
typedef struct STsdbDataIter2 STsdbDataIter2;
typedef struct STsdbFilterInfo STsdbFilterInfo;
typedef struct {
int64_t suid;
int64_t uid;
SDelData delData;
} SDelInfo;
struct STsdbDataIter2 {
STsdbDataIter2* next;
SRBTreeNode rbtn;
int32_t type;
SRowInfo rowInfo;
SDelInfo delInfo;
union {
// TSDB_MEM_TABLE_DATA_ITER
struct {
SMemTable* pMemTable;
} mIter;
// TSDB_DATA_FILE_DATA_ITER
struct {
SDataFReader* pReader;
SArray* aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk;
SBlockData bData;
int32_t iBlockIdx;
int32_t iDataBlk;
int32_t iRow;
} dIter;
// TSDB_STT_FILE_DATA_ITER
struct {
SDataFReader* pReader;
int32_t iStt;
SArray* aSttBlk;
SBlockData bData;
int32_t iSttBlk;
int32_t iRow;
} sIter;
// TSDB_TOMB_FILE_DATA_ITER
struct {
SDelFReader* pReader;
SArray* aDelIdx;
SArray* aDelData;
int32_t iDelIdx;
int32_t iDelData;
} tIter;
};
};
#define TSDB_FILTER_FLAG_BY_VERSION 0x1
struct STsdbFilterInfo {
int32_t flag;
int64_t sver;
int64_t ever;
};
#define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2*)(((char*)pNode) - offsetof(STsdbDataIter2, rbtn)))
/* open */
static int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
// create handle
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_DATA_FILE_DATA_ITER;
pIter->dIter.pReader = pReader;
if ((pIter->dIter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataCreate(&pIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iBlockIdx = 0;
pIter->dIter.iDataBlk = 0;
pIter->dIter.iRow = 0;
// read data
code = tsdbReadBlockIdx(pReader, pIter->dIter.aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->dIter.aBlockIdx) == 0) goto _clear;
_exit:
if (code) {
if (pIter) {
_clear:
tBlockDataDestroy(&pIter->dIter.bData);
taosArrayDestroy(pIter->dIter.aBlockIdx);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
static int32_t tsdbOpenSttFileDataIter(SDataFReader* pReader, int32_t iStt, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
// create handle
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_STT_FILE_DATA_ITER;
pIter->sIter.pReader = pReader;
pIter->sIter.iStt = iStt;
pIter->sIter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->sIter.aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataCreate(&pIter->sIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->sIter.iSttBlk = 0;
pIter->sIter.iRow = 0;
// read data
code = tsdbReadSttBlk(pReader, iStt, pIter->sIter.aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->sIter.aSttBlk) == 0) goto _clear;
_exit:
if (code) {
if (pIter) {
_clear:
taosArrayDestroy(pIter->sIter.aSttBlk);
tBlockDataDestroy(&pIter->sIter.bData);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
static int32_t tsdbOpenTombFileDataIter(SDelFReader* pReader, STsdbDataIter2** ppIter) {
int32_t code = 0;
int32_t lino = 0;
STsdbDataIter2* pIter = (STsdbDataIter2*)taosMemoryCalloc(1, sizeof(*pIter));
if (pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pIter->type = TSDB_TOMB_FILE_DATA_ITER;
pIter->tIter.pReader = pReader;
if ((pIter->tIter.aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if ((pIter->tIter.aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbReadDelIdx(pReader, pIter->tIter.aDelIdx);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pIter->tIter.aDelIdx) == 0) goto _clear;
pIter->tIter.iDelIdx = 0;
pIter->tIter.iDelData = 0;
_exit:
if (code) {
if (pIter) {
_clear:
taosArrayDestroy(pIter->tIter.aDelIdx);
taosArrayDestroy(pIter->tIter.aDelData);
taosMemoryFree(pIter);
pIter = NULL;
}
}
*ppIter = pIter;
return code;
}
/* close */
static void tsdbCloseDataFileDataIter(STsdbDataIter2* pIter) {
tBlockDataDestroy(&pIter->dIter.bData);
tMapDataClear(&pIter->dIter.mDataBlk);
taosArrayDestroy(pIter->dIter.aBlockIdx);
taosMemoryFree(pIter);
}
static void tsdbCloseSttFileDataIter(STsdbDataIter2* pIter) {
tBlockDataDestroy(&pIter->sIter.bData);
taosArrayDestroy(pIter->sIter.aSttBlk);
taosMemoryFree(pIter);
}
static void tsdbCloseTombFileDataIter(STsdbDataIter2* pIter) {
taosArrayDestroy(pIter->tIter.aDelData);
taosArrayDestroy(pIter->tIter.aDelIdx);
taosMemoryFree(pIter);
}
static void tsdbCloseDataIter2(STsdbDataIter2* pIter) {
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
ASSERT(0);
} else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
tsdbCloseDataFileDataIter(pIter);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
tsdbCloseSttFileDataIter(pIter);
} else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) {
tsdbCloseTombFileDataIter(pIter);
} else {
ASSERT(0);
}
}
/* cmpr */
static int32_t tsdbDataIterCmprFn(const SRBTreeNode* pNode1, const SRBTreeNode* pNode2) {
STsdbDataIter2* pIter1 = TSDB_RBTN_TO_DATA_ITER(pNode1);
STsdbDataIter2* pIter2 = TSDB_RBTN_TO_DATA_ITER(pNode2);
return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}
/* seek */
/* iter next */
static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
while (pIter->dIter.iRow < pIter->dIter.bData.nRow) {
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver ||
pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) {
pIter->dIter.iRow++;
continue;
}
}
}
pIter->rowInfo.suid = pIter->dIter.bData.suid;
pIter->rowInfo.uid = pIter->dIter.bData.uid;
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow);
pIter->dIter.iRow++;
goto _exit;
}
for (;;) {
while (pIter->dIter.iDataBlk < pIter->dIter.mDataBlk.nItem) {
SDataBlk dataBlk;
tMapDataGetItemByIdx(&pIter->dIter.mDataBlk, pIter->dIter.iDataBlk, &dataBlk, tGetDataBlk);
// filter
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) {
pIter->dIter.iDataBlk++;
continue;
}
}
}
code = tsdbReadDataBlockEx(pIter->dIter.pReader, &dataBlk, &pIter->dIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iDataBlk++;
pIter->dIter.iRow = 0;
break;
}
if (pIter->dIter.iRow < pIter->dIter.bData.nRow) break;
for (;;) {
if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) {
SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx);
code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->dIter.iBlockIdx++;
pIter->dIter.iDataBlk = 0;
break;
} else {
pIter->rowInfo = (SRowInfo){0};
goto _exit;
}
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
while (pIter->sIter.iRow < pIter->sIter.bData.nRow) {
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] ||
pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) {
pIter->sIter.iRow++;
continue;
}
}
}
pIter->rowInfo.suid = pIter->sIter.bData.suid;
pIter->rowInfo.uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow];
pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->sIter.bData, pIter->sIter.iRow);
pIter->sIter.iRow++;
goto _exit;
}
for (;;) {
if (pIter->sIter.iSttBlk < taosArrayGetSize(pIter->sIter.aSttBlk)) {
SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk);
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) {
pIter->sIter.iSttBlk++;
continue;
}
}
}
code = tsdbReadSttBlockEx(pIter->sIter.pReader, pIter->sIter.iStt, pSttBlk, &pIter->sIter.bData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->sIter.iRow = 0;
pIter->sIter.iSttBlk++;
break;
} else {
pIter->rowInfo = (SRowInfo){0};
goto _exit;
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
int32_t lino = 0;
for (;;) {
while (pIter->tIter.iDelData < taosArrayGetSize(pIter->tIter.aDelData)) {
SDelData* pDelData = taosArrayGet(pIter->tIter.aDelData, pIter->tIter.iDelData);
if (pFilterInfo) {
if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) {
if (pFilterInfo->sver > pDelData->version || pFilterInfo->ever < pDelData->version) {
pIter->tIter.iDelData++;
continue;
}
}
}
pIter->delInfo.delData = *pDelData;
pIter->tIter.iDelData++;
goto _exit;
}
for (;;) {
if (pIter->tIter.iDelIdx < taosArrayGetSize(pIter->tIter.aDelIdx)) {
SDelIdx* pDelIdx = taosArrayGet(pIter->tIter.aDelIdx, pIter->tIter.iDelIdx);
code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
pIter->delInfo.suid = pDelIdx->suid;
pIter->delInfo.uid = pDelIdx->uid;
pIter->tIter.iDelData = 0;
pIter->tIter.iDelIdx++;
break;
} else {
pIter->delInfo = (SDelInfo){0};
goto _exit;
}
}
}
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
int32_t code = 0;
if (pIter->type == TSDB_MEM_TABLE_DATA_ITER) {
ASSERT(0);
return code;
} else if (pIter->type == TSDB_DATA_FILE_DATA_ITER) {
return tsdbDataFileDataIterNext(pIter, pFilterInfo);
} else if (pIter->type == TSDB_STT_FILE_DATA_ITER) {
return tsdbSttFileDataIterNext(pIter, pFilterInfo);
} else if (pIter->type == TSDB_TOMB_FILE_DATA_ITER) {
return tsdbTombFileDataIterNext(pIter, pFilterInfo);
} else {
ASSERT(0);
return code;
}
}
/* get */
// STsdbSnapReader ========================================
struct STsdbSnapReader {
STsdb* pTsdb;
......@@ -1318,7 +871,7 @@ static int32_t tsdbSnapWriteFileDataStart(STsdbSnapWriter* pWriter, int32_t fid)
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->pSIter) {
code = tsdbSttFileDataIterNext(pWriter->pSIter, NULL);
code = tsdbDataIterNext2(pWriter->pSIter, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
// add to tree
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册