提交 ff4fda84 编写于 作者: H Hongze Cheng

refact code

上级 7d23c830
...@@ -43,7 +43,7 @@ typedef struct STbDataIter STbDataIter; ...@@ -43,7 +43,7 @@ typedef struct STbDataIter STbDataIter;
typedef struct SMapData SMapData; typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx; typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock; typedef struct SBlock SBlock;
typedef struct SBlockL SBlockL; typedef struct SSstBlk SSstBlk;
typedef struct SColData SColData; typedef struct SColData SColData;
typedef struct SDiskDataHdr SDiskDataHdr; typedef struct SDiskDataHdr SDiskDataHdr;
typedef struct SBlockData SBlockData; typedef struct SBlockData SBlockData;
...@@ -120,9 +120,9 @@ int32_t tPutBlock(uint8_t *p, void *ph); ...@@ -120,9 +120,9 @@ int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2); int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock); bool tBlockHasSma(SBlock *pBlock);
// SBlockL // SSstBlk
int32_t tPutBlockL(uint8_t *p, void *ph); int32_t tPutSstBlk(uint8_t *p, void *ph);
int32_t tGetBlockL(uint8_t *p, void *ph); int32_t tGetSstBlk(uint8_t *p, void *ph);
// SBlockIdx // SBlockIdx
int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph);
...@@ -254,7 +254,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); ...@@ -254,7 +254,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL); int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast); int8_t cmprAlg, int8_t toLast);
...@@ -264,10 +264,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS ...@@ -264,10 +264,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL); int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData); int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData);
int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData);
// SDelFWriter // SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync);
...@@ -439,7 +439,7 @@ struct SBlock { ...@@ -439,7 +439,7 @@ struct SBlock {
SSmaInfo smaInfo; SSmaInfo smaInfo;
}; };
struct SBlockL { struct SSstBlk {
int64_t suid; int64_t suid;
int64_t minUid; int64_t minUid;
int64_t maxUid; int64_t maxUid;
......
...@@ -32,9 +32,9 @@ typedef struct { ...@@ -32,9 +32,9 @@ typedef struct {
STbDataIter iter; STbDataIter iter;
}; // memory data iter }; // memory data iter
struct { struct {
int32_t iLast; int32_t iSst;
SArray *aBlockL; SArray *aSstBlk;
int32_t iBlockL; int32_t iSstBlk;
SBlockData bData; SBlockData bData;
int32_t iRow; int32_t iRow;
}; // sst file data iter }; // sst file data iter
...@@ -77,7 +77,7 @@ typedef struct { ...@@ -77,7 +77,7 @@ typedef struct {
struct { struct {
SDataFWriter *pWriter; SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx> SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aBlockL; // SArray<SBlockL> SArray *aSstBlk; // SArray<SSstBlk>
SMapData mBlock; // SMapData<SBlock> SMapData mBlock; // SMapData<SBlock>
SBlockData bData; SBlockData bData;
SBlockData bDatal; SBlockData bDatal;
...@@ -92,7 +92,7 @@ typedef struct { ...@@ -92,7 +92,7 @@ typedef struct {
SArray *aDelData; // SArray<SDelData> SArray *aDelData; // SArray<SDelData>
} SCommitter; } SCommitter;
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *aSstBlk,
SBlockData *pBlockData); // todo SBlockData *pBlockData); // todo
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
...@@ -433,19 +433,19 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -433,19 +433,19 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
if (pReader) { if (pReader) {
if (pReader->pSet->nSstF >= pCommitter->maxLast) { if (pReader->pSet->nSstF >= pCommitter->maxLast) {
int8_t iIter = 0; int8_t iIter = 0;
for (int32_t iLast = 0; iLast < pReader->pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) {
pIter = &pCommitter->aDataIter[iIter]; pIter = &pCommitter->aDataIter[iIter];
pIter->type = LAST_DATA_ITER; pIter->type = LAST_DATA_ITER;
pIter->iLast = iLast; pIter->iSst = iSst;
code = tsdbReadBlockL(pCommitter->dReader.pReader, iLast, pIter->aBlockL); code = tsdbReadSstBlk(pCommitter->dReader.pReader, iSst, pIter->aSstBlk);
if (code) goto _err; if (code) goto _err;
if (taosArrayGetSize(pIter->aBlockL) == 0) continue; if (taosArrayGetSize(pIter->aSstBlk) == 0) continue;
pIter->iBlockL = 0; pIter->iSstBlk = 0;
SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, 0);
code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, iLast, pBlockL, &pIter->bData); code = tsdbReadSstBlockEx(pCommitter->dReader.pReader, iSst, pSstBlk, &pIter->bData);
if (code) goto _err; if (code) goto _err;
pIter->iRow = 0; pIter->iRow = 0;
...@@ -457,8 +457,8 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -457,8 +457,8 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
iIter++; iIter++;
} }
} else { } else {
for (int32_t iLast = 0; iLast < pReader->pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) {
SSstFile *pSstFile = pReader->pSet->aSstF[iLast]; SSstFile *pSstFile = pReader->pSet->aSstF[iSst];
if (pSstFile->size > pSstFile->offset) { if (pSstFile->size > pSstFile->offset) {
pCommitter->toLastOnly = 1; pCommitter->toLastOnly = 1;
break; break;
...@@ -523,8 +523,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -523,8 +523,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
fSma = *pRSet->pSmaF; fSma = *pRSet->pSmaF;
wSet.diskId = pRSet->diskId; wSet.diskId = pRSet->diskId;
if (pRSet->nSstF < pCommitter->maxLast) { if (pRSet->nSstF < pCommitter->maxLast) {
for (int32_t iLast = 0; iLast < pRSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pRSet->nSstF; iSst++) {
wSet.aSstF[iLast] = pRSet->aSstF[iLast]; wSet.aSstF[iSst] = pRSet->aSstF[iSst];
} }
wSet.nSstF = pRSet->nSstF + 1; wSet.nSstF = pRSet->nSstF + 1;
} else { } else {
...@@ -542,7 +542,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -542,7 +542,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
if (code) goto _err; if (code) goto _err;
taosArrayClear(pCommitter->dWriter.aBlockIdx); taosArrayClear(pCommitter->dWriter.aBlockIdx);
taosArrayClear(pCommitter->dWriter.aBlockL); taosArrayClear(pCommitter->dWriter.aSstBlk);
tMapDataReset(&pCommitter->dWriter.mBlock); tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData); tBlockDataReset(&pCommitter->dWriter.bData);
tBlockDataReset(&pCommitter->dWriter.bDatal); tBlockDataReset(&pCommitter->dWriter.bDatal);
...@@ -613,7 +613,7 @@ _err: ...@@ -613,7 +613,7 @@ _err:
static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
SBlockL blockL; SSstBlk blockL;
SBlockData *pBlockData = &pCommitter->dWriter.bDatal; SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
ASSERT(pBlockData->nRow > 0); ASSERT(pBlockData->nRow > 0);
...@@ -638,8 +638,8 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { ...@@ -638,8 +638,8 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) {
code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1); code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1);
if (code) goto _err; if (code) goto _err;
// push SBlockL // push SSstBlk
if (taosArrayPush(pCommitter->dWriter.aBlockL, &blockL) == NULL) { if (taosArrayPush(pCommitter->dWriter.aSstBlk, &blockL) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -661,8 +661,8 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { ...@@ -661,8 +661,8 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
if (code) goto _err; if (code) goto _err;
// write aBlockL // write aSstBlk
code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL); code = tsdbWriteSstBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSstBlk);
if (code) goto _err; if (code) goto _err;
// update file header // update file header
...@@ -790,10 +790,10 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { ...@@ -790,10 +790,10 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
if (code) goto _exit; if (code) goto _exit;
// merger // merger
for (int32_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { for (int32_t iSst = 0; iSst < TSDB_MAX_LAST_FILE; iSst++) {
SDataIter *pIter = &pCommitter->aDataIter[iLast]; SDataIter *pIter = &pCommitter->aDataIter[iSst];
pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); pIter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
if (pIter->aBlockL == NULL) { if (pIter->aSstBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -809,8 +809,8 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { ...@@ -809,8 +809,8 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
goto _exit; goto _exit;
} }
pCommitter->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); pCommitter->dWriter.aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
if (pCommitter->dWriter.aBlockL == NULL) { if (pCommitter->dWriter.aSstBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -832,15 +832,15 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { ...@@ -832,15 +832,15 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
tBlockDataDestroy(&pCommitter->dReader.bData, 1); tBlockDataDestroy(&pCommitter->dReader.bData, 1);
// merger // merger
for (int32_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { for (int32_t iSst = 0; iSst < TSDB_MAX_LAST_FILE; iSst++) {
SDataIter *pIter = &pCommitter->aDataIter[iLast]; SDataIter *pIter = &pCommitter->aDataIter[iSst];
taosArrayDestroy(pIter->aBlockL); taosArrayDestroy(pIter->aSstBlk);
tBlockDataDestroy(&pIter->bData, 1); tBlockDataDestroy(&pIter->bData, 1);
} }
// writer // writer
taosArrayDestroy(pCommitter->dWriter.aBlockIdx); taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
taosArrayDestroy(pCommitter->dWriter.aBlockL); taosArrayDestroy(pCommitter->dWriter.aSstBlk);
tMapDataClear(&pCommitter->dWriter.mBlock); tMapDataClear(&pCommitter->dWriter.mBlock);
tBlockDataDestroy(&pCommitter->dWriter.bData, 1); tBlockDataDestroy(&pCommitter->dWriter.bData, 1);
tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1);
...@@ -1055,11 +1055,11 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { ...@@ -1055,11 +1055,11 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
} else { } else {
pIter->iBlockL++; pIter->iSstBlk++;
if (pIter->iBlockL < taosArrayGetSize(pIter->aBlockL)) { if (pIter->iSstBlk < taosArrayGetSize(pIter->aSstBlk)) {
SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
code = tsdbReadLastBlockEx(pCommitter->dReader.pReader, pIter->iLast, pBlockL, &pIter->bData); code = tsdbReadSstBlockEx(pCommitter->dReader.pReader, pIter->iSst, pSstBlk, &pIter->bData);
if (code) goto _exit; if (code) goto _exit;
pIter->iRow = 0; pIter->iRow = 0;
......
...@@ -255,8 +255,8 @@ void tsdbFSDestroy(STsdbFS *pFS) { ...@@ -255,8 +255,8 @@ void tsdbFSDestroy(STsdbFS *pFS) {
taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->pSmaF); taosMemoryFree(pSet->pSmaF);
for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
taosMemoryFree(pSet->aSstF[iLast]); taosMemoryFree(pSet->aSstF[iSst]);
} }
} }
...@@ -645,15 +645,15 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { ...@@ -645,15 +645,15 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
pDFileSet->nSstF++; pDFileSet->nSstF++;
} else if (pSet->nSstF < pDFileSet->nSstF) { } else if (pSet->nSstF < pDFileSet->nSstF) {
ASSERT(pSet->nSstF == 1); ASSERT(pSet->nSstF == 1);
for (int32_t iLast = 1; iLast < pDFileSet->nSstF; iLast++) { for (int32_t iSst = 1; iSst < pDFileSet->nSstF; iSst++) {
taosMemoryFree(pDFileSet->aSstF[iLast]); taosMemoryFree(pDFileSet->aSstF[iSst]);
} }
*pDFileSet->aSstF[0] = *pSet->aSstF[0]; *pDFileSet->aSstF[0] = *pSet->aSstF[0];
pDFileSet->nSstF = 1; pDFileSet->nSstF = 1;
} else { } else {
for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
*pDFileSet->aSstF[iLast] = *pSet->aSstF[iLast]; *pDFileSet->aSstF[iSst] = *pSet->aSstF[iSst];
} }
} }
...@@ -876,15 +876,15 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -876,15 +876,15 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
pSetOld->nSstF++; pSetOld->nSstF++;
} else if (pSetNew->nSstF < pSetOld->nSstF) { } else if (pSetNew->nSstF < pSetOld->nSstF) {
ASSERT(pSetNew->nSstF == 1); ASSERT(pSetNew->nSstF == 1);
for (int32_t iLast = 0; iLast < pSetOld->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
SSstFile *pSstFile = pSetOld->aSstF[iLast]; SSstFile *pSstFile = pSetOld->aSstF[iSst];
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pSstFile); taosMemoryFree(pSstFile);
} }
pSetOld->aSstF[iLast] = NULL; pSetOld->aSstF[iSst] = NULL;
} }
pSetOld->nSstF = 1; pSetOld->nSstF = 1;
...@@ -896,8 +896,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -896,8 +896,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
*pSetOld->aSstF[0] = *pSetNew->aSstF[0]; *pSetOld->aSstF[0] = *pSetNew->aSstF[0];
pSetOld->aSstF[0]->nRef = 1; pSetOld->aSstF[0]->nRef = 1;
} else { } else {
for (int32_t iLast = 0; iLast < pSetOld->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
SSstFile *pSstFile = pSetOld->aSstF[iLast]; SSstFile *pSstFile = pSetOld->aSstF[iSst];
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
...@@ -905,19 +905,19 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -905,19 +905,19 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
taosMemoryFree(pSstFile); taosMemoryFree(pSstFile);
} }
pSetOld->aSstF[iLast] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (pSetOld->aSstF[iLast] == NULL) { if (pSetOld->aSstF[iSst] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
*pSetOld->aSstF[iLast] = *pSetNew->aSstF[iLast]; *pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst];
pSetOld->aSstF[iLast]->nRef = 1; pSetOld->aSstF[iSst]->nRef = 1;
} }
} }
} else { } else {
ASSERT(pSetOld->nSstF == pSetNew->nSstF); ASSERT(pSetOld->nSstF == pSetNew->nSstF);
for (int32_t iLast = 0; iLast < pSetOld->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
SSstFile *pSstFile = pSetOld->aSstF[iLast]; SSstFile *pSstFile = pSetOld->aSstF[iSst];
nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
...@@ -925,13 +925,13 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -925,13 +925,13 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
taosMemoryFree(pSstFile); taosMemoryFree(pSstFile);
} }
pSetOld->aSstF[iLast] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (pSetOld->aSstF[iLast] == NULL) { if (pSetOld->aSstF[iSst] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
*pSetOld->aSstF[iLast] = *pSetNew->aSstF[iLast]; *pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst];
pSetOld->aSstF[iLast]->nRef = 1; pSetOld->aSstF[iSst]->nRef = 1;
} }
} }
...@@ -965,12 +965,12 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -965,12 +965,12 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
taosMemoryFree(pSetOld->pSmaF); taosMemoryFree(pSetOld->pSmaF);
} }
for (int8_t iLast = 0; iLast < pSetOld->nSstF; iLast++) { for (int8_t iSst = 0; iSst < pSetOld->nSstF; iSst++) {
nRef = atomic_sub_fetch_32(&pSetOld->aSstF[iLast]->nRef, 1); nRef = atomic_sub_fetch_32(&pSetOld->aSstF[iSst]->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSstF[iLast], fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSstF[iSst], fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pSetOld->aSstF[iLast]); taosMemoryFree(pSetOld->aSstF[iSst]);
} }
} }
...@@ -1063,8 +1063,8 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1063,8 +1063,8 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1); nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
nRef = atomic_fetch_add_32(&pSet->aSstF[iLast]->nRef, 1); nRef = atomic_fetch_add_32(&pSet->aSstF[iSst]->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
} }
...@@ -1123,13 +1123,13 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1123,13 +1123,13 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
} }
// sst // sst
for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
nRef = atomic_sub_fetch_32(&pSet->aSstF[iLast]->nRef, 1); nRef = atomic_sub_fetch_32(&pSet->aSstF[iSst]->nRef, 1);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iLast], fname); tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pSet->aSstF[iLast]); taosMemoryFree(pSet->aSstF[iSst]);
/* code */ /* code */
} }
} }
......
...@@ -196,8 +196,8 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) { ...@@ -196,8 +196,8 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) {
// sst // sst
n += tPutU8(p ? p + n : p, pSet->nSstF); n += tPutU8(p ? p + n : p, pSet->nSstF);
for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
n += tPutSstFile(p ? p + n : p, pSet->aSstF[iLast]); n += tPutSstFile(p ? p + n : p, pSet->aSstF[iSst]);
} }
return n; return n;
...@@ -236,13 +236,13 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { ...@@ -236,13 +236,13 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// sst // sst
n += tGetU8(p + n, &pSet->nSstF); n += tGetU8(p + n, &pSet->nSstF);
for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
pSet->aSstF[iLast] = (SSstFile *)taosMemoryCalloc(1, sizeof(SSstFile)); pSet->aSstF[iSst] = (SSstFile *)taosMemoryCalloc(1, sizeof(SSstFile));
if (pSet->aSstF[iLast] == NULL) { if (pSet->aSstF[iSst] == NULL) {
return -1; return -1;
} }
pSet->aSstF[iLast]->nRef = 1; pSet->aSstF[iSst]->nRef = 1;
n += tGetSstFile(p + n, pSet->aSstF[iLast]); n += tGetSstFile(p + n, pSet->aSstF[iSst]);
} }
return n; return n;
......
...@@ -18,12 +18,12 @@ ...@@ -18,12 +18,12 @@
// SLDataIter ================================================= // SLDataIter =================================================
typedef struct SLDataIter { typedef struct SLDataIter {
SRBTreeNode node; SRBTreeNode node;
SBlockL *pBlockL; SSstBlk *pSstBlk;
SDataFReader *pReader; SDataFReader *pReader;
int32_t iLast; int32_t iSst;
int8_t backward; int8_t backward;
SArray *aBlockL; SArray *aSstBlk;
int32_t iBlockL; int32_t iSstBlk;
SBlockData bData; SBlockData bData;
int32_t iRow; int32_t iRow;
SRowInfo rInfo; SRowInfo rInfo;
...@@ -32,7 +32,7 @@ typedef struct SLDataIter { ...@@ -32,7 +32,7 @@ typedef struct SLDataIter {
SVersionRange verRange; SVersionRange verRange;
} SLDataIter; } SLDataIter;
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid, int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pRange) { STimeWindow *pTimeWindow, SVersionRange *pRange) {
int32_t code = 0; int32_t code = 0;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
...@@ -41,10 +41,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -41,10 +41,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->timeWindow = *pTimeWindow; (*pIter)->timeWindow = *pTimeWindow;
(*pIter)->verRange = *pRange; (*pIter)->verRange = *pRange;
(*pIter)->pReader = pReader; (*pIter)->pReader = pReader;
(*pIter)->iLast = iLast; (*pIter)->iSst = iSst;
(*pIter)->backward = backward; (*pIter)->backward = backward;
(*pIter)->aBlockL = taosArrayInit(0, sizeof(SBlockL)); (*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
if ((*pIter)->aBlockL == NULL) { if ((*pIter)->aSstBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -54,18 +54,18 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -54,18 +54,18 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
goto _exit; goto _exit;
} }
code = tsdbReadBlockL(pReader, iLast, (*pIter)->aBlockL); code = tsdbReadSstBlk(pReader, iSst, (*pIter)->aSstBlk);
if (code) { if (code) {
goto _exit; goto _exit;
} }
size_t size = taosArrayGetSize((*pIter)->aBlockL); size_t size = taosArrayGetSize((*pIter)->aSstBlk);
// find the start block // find the start block
int32_t index = -1; int32_t index = -1;
if (!backward) { // asc if (!backward) { // asc
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SBlockL *p = taosArrayGet((*pIter)->aBlockL, i); SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
if (p->minUid <= uid && p->maxUid >= uid) { if (p->minUid <= uid && p->maxUid >= uid) {
index = i; index = i;
break; break;
...@@ -73,7 +73,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -73,7 +73,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
} }
} else { // desc } else { // desc
for (int32_t i = size - 1; i >= 0; --i) { for (int32_t i = size - 1; i >= 0; --i) {
SBlockL *p = taosArrayGet((*pIter)->aBlockL, i); SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
if (p->minUid <= uid && p->maxUid >= uid) { if (p->minUid <= uid && p->maxUid >= uid) {
index = i; index = i;
break; break;
...@@ -81,9 +81,9 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t ...@@ -81,9 +81,9 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
} }
} }
(*pIter)->iBlockL = index; (*pIter)->iSstBlk = index;
if (index != -1) { if (index != -1) {
(*pIter)->pBlockL = taosArrayGet((*pIter)->aBlockL, (*pIter)->iBlockL); (*pIter)->pSstBlk = taosArrayGet((*pIter)->aSstBlk, (*pIter)->iSstBlk);
} }
_exit: _exit:
...@@ -92,20 +92,20 @@ _exit: ...@@ -92,20 +92,20 @@ _exit:
void tLDataIterClose(SLDataIter *pIter) { void tLDataIterClose(SLDataIter *pIter) {
tBlockDataDestroy(&pIter->bData, 1); tBlockDataDestroy(&pIter->bData, 1);
taosArrayDestroy(pIter->aBlockL); taosArrayDestroy(pIter->aSstBlk);
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData);
void tLDataIterNextBlock(SLDataIter *pIter) { void tLDataIterNextBlock(SLDataIter *pIter) {
int32_t step = pIter->backward ? -1 : 1; int32_t step = pIter->backward ? -1 : 1;
pIter->iBlockL += step; pIter->iSstBlk += step;
int32_t index = -1; int32_t index = -1;
size_t size = taosArrayGetSize(pIter->aBlockL); size_t size = taosArrayGetSize(pIter->aSstBlk);
for (int32_t i = pIter->iBlockL; i < size && i >= 0; i += step) { for (int32_t i = pIter->iSstBlk; i < size && i >= 0; i += step) {
SBlockL *p = taosArrayGet(pIter->aBlockL, i); SSstBlk *p = taosArrayGet(pIter->aSstBlk, i);
if ((!pIter->backward) && p->minUid > pIter->uid) { if ((!pIter->backward) && p->minUid > pIter->uid) {
break; break;
} }
...@@ -121,9 +121,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) { ...@@ -121,9 +121,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
} }
if (index == -1) { if (index == -1) {
pIter->pBlockL = NULL; pIter->pSstBlk = NULL;
} else { } else {
pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); pIter->pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
} }
} }
...@@ -194,14 +194,14 @@ bool tLDataIterNextRow(SLDataIter *pIter) { ...@@ -194,14 +194,14 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
int32_t step = pIter->backward ? -1 : 1; int32_t step = pIter->backward ? -1 : 1;
// no qualified last file block in current file, no need to fetch row // no qualified last file block in current file, no need to fetch row
if (pIter->pBlockL == NULL) { if (pIter->pSstBlk == NULL) {
return false; return false;
} }
int32_t iBlockL = pIter->iBlockL; int32_t iBlockL = pIter->iSstBlk;
if (pIter->bData.nRow == 0 && pIter->pBlockL != NULL) { // current block not loaded yet if (pIter->bData.nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet
code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData); code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _exit; goto _exit;
} }
...@@ -216,15 +216,15 @@ bool tLDataIterNextRow(SLDataIter *pIter) { ...@@ -216,15 +216,15 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) { if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) {
tLDataIterNextBlock(pIter); tLDataIterNextBlock(pIter);
if (pIter->pBlockL == NULL) { // no more data if (pIter->pSstBlk == NULL) { // no more data
goto _exit; goto _exit;
} }
} else { } else {
break; break;
} }
if (iBlockL != pIter->iBlockL) { if (iBlockL != pIter->iSstBlk) {
code = tsdbReadLastBlockEx(pIter->pReader, pIter->iLast, pIter->pBlockL, &pIter->bData); code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
if (code) { if (code) {
goto _exit; goto _exit;
} }
...@@ -241,7 +241,7 @@ _exit: ...@@ -241,7 +241,7 @@ _exit:
terrno = code; terrno = code;
} }
return (code == TSDB_CODE_SUCCESS) && (pIter->pBlockL != NULL); return (code == TSDB_CODE_SUCCESS) && (pIter->pSstBlk != NULL);
} }
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; } SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
......
...@@ -1158,9 +1158,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc ...@@ -1158,9 +1158,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
// todo // todo
bool overlapWithlastBlock = false; bool overlapWithlastBlock = false;
#if 0 #if 0
if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0 && (pLastBlockReader->currentBlockIndex != -1)) { if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); SSstBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
} }
#endif #endif
......
...@@ -434,10 +434,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS ...@@ -434,10 +434,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
} }
// sst // sst
for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) {
tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iLast], fname); tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname);
pReader->aLastFD[iLast] = taosOpenFile(fname, TD_FILE_READ); pReader->aLastFD[iSst] = taosOpenFile(fname, TD_FILE_READ);
if (pReader->aLastFD[iLast] == NULL) { if (pReader->aLastFD[iSst] == NULL) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
...@@ -475,8 +475,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { ...@@ -475,8 +475,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
} }
// sst // sst
for (int32_t iLast = 0; iLast < (*ppReader)->pSet->nSstF; iLast++) { for (int32_t iSst = 0; iSst < (*ppReader)->pSet->nSstF; iSst++) {
if (taosCloseFile(&(*ppReader)->aLastFD[iLast]) < 0) { if (taosCloseFile(&(*ppReader)->aLastFD[iSst]) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
...@@ -559,14 +559,14 @@ _err: ...@@ -559,14 +559,14 @@ _err:
return code; return code;
} }
int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) {
int32_t code = 0; int32_t code = 0;
int64_t offset = pReader->pSet->aSstF[iLast]->offset; int64_t offset = pReader->pSet->aSstF[iSst]->offset;
int64_t size = pReader->pSet->aSstF[iLast]->size - offset; int64_t size = pReader->pSet->aSstF[iSst]->size - offset;
int64_t n; int64_t n;
uint32_t delimiter; uint32_t delimiter;
taosArrayClear(aBlockL); taosArrayClear(aSstBlk);
if (size == 0) { if (size == 0) {
goto _exit; goto _exit;
} }
...@@ -576,13 +576,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { ...@@ -576,13 +576,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) {
if (code) goto _err; if (code) goto _err;
// seek // seek
if (taosLSeekFile(pReader->aLastFD[iLast], offset, SEEK_SET) < 0) { if (taosLSeekFile(pReader->aLastFD[iSst], offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
// read // read
n = taosReadFile(pReader->aLastFD[iLast], pReader->aBuf[0], size); n = taosReadFile(pReader->aLastFD[iSst], pReader->aBuf[0], size);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
...@@ -603,10 +603,10 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { ...@@ -603,10 +603,10 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) {
ASSERT(delimiter == TSDB_FILE_DLMT); ASSERT(delimiter == TSDB_FILE_DLMT);
while (n < size - sizeof(TSCKSUM)) { while (n < size - sizeof(TSCKSUM)) {
SBlockL blockl; SSstBlk blockl;
n += tGetBlockL(pReader->aBuf[0] + n, &blockl); n += tGetSstBlk(pReader->aBuf[0] + n, &blockl);
if (taosArrayPush(aBlockL, &blockl) == NULL) { if (taosArrayPush(aSstBlk, &blockl) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -897,10 +897,10 @@ _err: ...@@ -897,10 +897,10 @@ _err:
return code; return code;
} }
int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) { int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData); code = tsdbReadBlockDataImpl(pReader, &pSstBlk->bInfo, 1, pBlockData);
if (code) goto _err; if (code) goto _err;
return code; return code;
...@@ -910,15 +910,15 @@ _err: ...@@ -910,15 +910,15 @@ _err:
return code; return code;
} }
int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) { int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) {
int32_t code = 0; int32_t code = 0;
// read // read
code = tsdbReadAndCheck(pReader->aLastFD[iLast], pBlockL->bInfo.offset, &pReader->aBuf[0], pBlockL->bInfo.szBlock, 0); code = tsdbReadAndCheck(pReader->aLastFD[iSst], pSstBlk->bInfo.offset, &pReader->aBuf[0], pSstBlk->bInfo.szBlock, 0);
if (code) goto _exit; if (code) goto _exit;
// decmpr // decmpr
code = tDecmprBlockData(pReader->aBuf[0], pBlockL->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
if (code) goto _exit; if (code) goto _exit;
_exit: _exit:
...@@ -952,9 +952,9 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ...@@ -952,9 +952,9 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
pWriter->fHead = *pSet->pHeadF; pWriter->fHead = *pSet->pHeadF;
pWriter->fData = *pSet->pDataF; pWriter->fData = *pSet->pDataF;
pWriter->fSma = *pSet->pSmaF; pWriter->fSma = *pSet->pSmaF;
for (int8_t iLast = 0; iLast < pSet->nSstF; iLast++) { for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) {
pWriter->wSet.aSstF[iLast] = &pWriter->fSst[iLast]; pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst];
pWriter->fSst[iLast] = *pSet->aSstF[iLast]; pWriter->fSst[iSst] = *pSet->aSstF[iSst];
} }
// head // head
...@@ -1301,22 +1301,22 @@ _err: ...@@ -1301,22 +1301,22 @@ _err:
return code; return code;
} }
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) {
int32_t code = 0; int32_t code = 0;
SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1]; SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
int64_t size; int64_t size;
int64_t n; int64_t n;
// check // check
if (taosArrayGetSize(aBlockL) == 0) { if (taosArrayGetSize(aSstBlk) == 0) {
pSstFile->offset = pSstFile->size; pSstFile->offset = pSstFile->size;
goto _exit; goto _exit;
} }
// size // size
size = sizeof(uint32_t); // TSDB_FILE_DLMT size = sizeof(uint32_t); // TSDB_FILE_DLMT
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
size += tPutBlockL(NULL, taosArrayGet(aBlockL, iBlockL)); size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL));
} }
size += sizeof(TSCKSUM); size += sizeof(TSCKSUM);
...@@ -1327,8 +1327,8 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { ...@@ -1327,8 +1327,8 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) {
// encode // encode
n = 0; n = 0;
n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT); n += tPutU32(pWriter->aBuf[0] + n, TSDB_FILE_DLMT);
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) {
n += tPutBlockL(pWriter->aBuf[0] + n, taosArrayGet(aBlockL, iBlockL)); n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL));
} }
taosCalcChecksumAppend(0, pWriter->aBuf[0], size); taosCalcChecksumAppend(0, pWriter->aBuf[0], size);
......
...@@ -27,9 +27,9 @@ struct STsdbSnapReader { ...@@ -27,9 +27,9 @@ struct STsdbSnapReader {
int32_t fid; int32_t fid;
SDataFReader* pDataFReader; SDataFReader* pDataFReader;
SArray* aBlockIdx; // SArray<SBlockIdx> SArray* aBlockIdx; // SArray<SBlockIdx>
SArray* aBlockL; // SArray<SBlockL> SArray* aSstBlk; // SArray<SSstBlk>
SBlockIdx* pBlockIdx; SBlockIdx* pBlockIdx;
SBlockL* pBlockL; SSstBlk* pSstBlk;
int32_t iBlockIdx; int32_t iBlockIdx;
int32_t iBlockL; int32_t iBlockL;
...@@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
if (code) goto _err; if (code) goto _err;
code = tsdbReadBlockL(pReader->pDataFReader, 0, pReader->aBlockL); code = tsdbReadSstBlk(pReader->pDataFReader, 0, pReader->aSstBlk);
if (code) goto _err; if (code) goto _err;
// init // init
...@@ -82,13 +82,13 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -82,13 +82,13 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
pReader->iBlockL = 0; pReader->iBlockL = 0;
while (true) { while (true) {
if (pReader->iBlockL >= taosArrayGetSize(pReader->aBlockL)) { if (pReader->iBlockL >= taosArrayGetSize(pReader->aSstBlk)) {
pReader->pBlockL = NULL; pReader->pSstBlk = NULL;
break; break;
} }
pReader->pBlockL = (SBlockL*)taosArrayGet(pReader->aBlockL, pReader->iBlockL); pReader->pSstBlk = (SSstBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL);
if (pReader->pBlockL->minVer <= pReader->ever && pReader->pBlockL->maxVer >= pReader->sver) { if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) {
// TODO // TODO
break; break;
} }
...@@ -101,8 +101,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -101,8 +101,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
while (true) { while (true) {
if (pReader->pBlockIdx && pReader->pBlockL) { if (pReader->pBlockIdx && pReader->pSstBlk) {
TABLEID id = {.suid = pReader->pBlockL->suid, .uid = pReader->pBlockL->minUid}; TABLEID id = {.suid = pReader->pSstBlk->suid, .uid = pReader->pSstBlk->minUid};
ASSERT(0); ASSERT(0);
...@@ -142,18 +142,18 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -142,18 +142,18 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
} }
if (*ppData) goto _exit; if (*ppData) goto _exit;
} else if (pReader->pBlockL) { } else if (pReader->pSstBlk) {
while (pReader->pBlockL) { while (pReader->pSstBlk) {
if (pReader->pBlockL->minVer <= pReader->ever && pReader->pBlockL->maxVer >= pReader->sver) { if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) {
// load data (todo) // load data (todo)
} }
// next // next
pReader->iBlockL++; pReader->iBlockL++;
if (pReader->iBlockL < taosArrayGetSize(pReader->aBlockL)) { if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) {
pReader->pBlockL = (SBlockL*)taosArrayGetSize(pReader->aBlockL); pReader->pSstBlk = (SSstBlk*)taosArrayGetSize(pReader->aSstBlk);
} else { } else {
pReader->pBlockL = NULL; pReader->pSstBlk = NULL;
} }
if (*ppData) goto _exit; if (*ppData) goto _exit;
...@@ -298,8 +298,8 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type ...@@ -298,8 +298,8 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pReader->aBlockL = taosArrayInit(0, sizeof(SBlockL)); pReader->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
if (pReader->aBlockL == NULL) { if (pReader->aSstBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -338,7 +338,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { ...@@ -338,7 +338,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
if (pReader->pDataFReader) { if (pReader->pDataFReader) {
tsdbDataFReaderClose(&pReader->pDataFReader); tsdbDataFReaderClose(&pReader->pDataFReader);
} }
taosArrayDestroy(pReader->aBlockL); taosArrayDestroy(pReader->aSstBlk);
taosArrayDestroy(pReader->aBlockIdx); taosArrayDestroy(pReader->aBlockIdx);
tMapDataClear(&pReader->mBlock); tMapDataClear(&pReader->mBlock);
tBlockDataDestroy(&pReader->oBlockData, 1); tBlockDataDestroy(&pReader->oBlockData, 1);
...@@ -431,7 +431,7 @@ struct STsdbSnapWriter { ...@@ -431,7 +431,7 @@ struct STsdbSnapWriter {
SBlockData* pBlockData; SBlockData* pBlockData;
int32_t iRow; int32_t iRow;
SBlockData bDataR; SBlockData bDataR;
SArray* aBlockL; // SArray<SBlockL> SArray* aSstBlk; // SArray<SSstBlk>
int32_t iBlockL; int32_t iBlockL;
SBlockData lDataR; SBlockData lDataR;
...@@ -443,7 +443,7 @@ struct STsdbSnapWriter { ...@@ -443,7 +443,7 @@ struct STsdbSnapWriter {
SMapData mBlockW; // SMapData<SBlock> SMapData mBlockW; // SMapData<SBlock>
SArray* aBlockIdxW; // SArray<SBlockIdx> SArray* aBlockIdxW; // SArray<SBlockIdx>
SArray* aBlockLW; // SArray<SBlockL> SArray* aBlockLW; // SArray<SSstBlk>
// for del file // for del file
SDelFReader* pDelFReader; SDelFReader* pDelFReader;
...@@ -845,7 +845,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { ...@@ -845,7 +845,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
// write remain stuff // write remain stuff
if (taosArrayGetSize(pWriter->aBlockLW) > 0) { if (taosArrayGetSize(pWriter->aBlockLW) > 0) {
code = tsdbWriteBlockL(pWriter->pDataFWriter, pWriter->aBlockIdxW); code = tsdbWriteSstBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW);
if (code) goto _err; if (code) goto _err;
} }
...@@ -911,12 +911,12 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 ...@@ -911,12 +911,12 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
if (code) goto _err; if (code) goto _err;
code = tsdbReadBlockL(pWriter->pDataFReader, 0, pWriter->aBlockL); code = tsdbReadSstBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk);
if (code) goto _err; if (code) goto _err;
} else { } else {
ASSERT(pWriter->pDataFReader == NULL); ASSERT(pWriter->pDataFReader == NULL);
taosArrayClear(pWriter->aBlockIdx); taosArrayClear(pWriter->aBlockIdx);
taosArrayClear(pWriter->aBlockL); taosArrayClear(pWriter->aSstBlk);
} }
pWriter->iBlockIdx = 0; pWriter->iBlockIdx = 0;
pWriter->pBlockIdx = NULL; pWriter->pBlockIdx = NULL;
...@@ -1147,8 +1147,8 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1147,8 +1147,8 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = tBlockDataCreate(&pWriter->bDataR); code = tBlockDataCreate(&pWriter->bDataR);
if (code) goto _err; if (code) goto _err;
pWriter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); pWriter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
if (pWriter->aBlockL == NULL) { if (pWriter->aSstBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -1161,7 +1161,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr ...@@ -1161,7 +1161,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
code = tBlockDataCreate(&pWriter->bDataW); code = tBlockDataCreate(&pWriter->bDataW);
if (code) goto _err; if (code) goto _err;
pWriter->aBlockLW = taosArrayInit(0, sizeof(SBlockL)); pWriter->aBlockLW = taosArrayInit(0, sizeof(SSstBlk));
if (pWriter->aBlockLW == NULL) { if (pWriter->aBlockLW == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
......
...@@ -214,7 +214,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { ...@@ -214,7 +214,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
int32_t tCmprBlockL(void const *lhs, void const *rhs) { int32_t tCmprBlockL(void const *lhs, void const *rhs) {
SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
SBlockL *rBlockL = (SBlockL *)rhs; SSstBlk *rBlockL = (SSstBlk *)rhs;
if (lBlockIdx->suid < rBlockL->suid) { if (lBlockIdx->suid < rBlockL->suid) {
return -1; return -1;
...@@ -311,41 +311,41 @@ bool tBlockHasSma(SBlock *pBlock) { ...@@ -311,41 +311,41 @@ bool tBlockHasSma(SBlock *pBlock) {
return pBlock->smaInfo.size > 0; return pBlock->smaInfo.size > 0;
} }
// SBlockL ====================================================== // SSstBlk ======================================================
int32_t tPutBlockL(uint8_t *p, void *ph) { int32_t tPutSstBlk(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
SBlockL *pBlockL = (SBlockL *)ph; SSstBlk *pSstBlk = (SSstBlk *)ph;
n += tPutI64(p ? p + n : p, pBlockL->suid); n += tPutI64(p ? p + n : p, pSstBlk->suid);
n += tPutI64(p ? p + n : p, pBlockL->minUid); n += tPutI64(p ? p + n : p, pSstBlk->minUid);
n += tPutI64(p ? p + n : p, pBlockL->maxUid); n += tPutI64(p ? p + n : p, pSstBlk->maxUid);
n += tPutI64v(p ? p + n : p, pBlockL->minKey); n += tPutI64v(p ? p + n : p, pSstBlk->minKey);
n += tPutI64v(p ? p + n : p, pBlockL->maxKey); n += tPutI64v(p ? p + n : p, pSstBlk->maxKey);
n += tPutI64v(p ? p + n : p, pBlockL->minVer); n += tPutI64v(p ? p + n : p, pSstBlk->minVer);
n += tPutI64v(p ? p + n : p, pBlockL->maxVer); n += tPutI64v(p ? p + n : p, pSstBlk->maxVer);
n += tPutI32v(p ? p + n : p, pBlockL->nRow); n += tPutI32v(p ? p + n : p, pSstBlk->nRow);
n += tPutI64v(p ? p + n : p, pBlockL->bInfo.offset); n += tPutI64v(p ? p + n : p, pSstBlk->bInfo.offset);
n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szBlock); n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szBlock);
n += tPutI32v(p ? p + n : p, pBlockL->bInfo.szKey); n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szKey);
return n; return n;
} }
int32_t tGetBlockL(uint8_t *p, void *ph) { int32_t tGetSstBlk(uint8_t *p, void *ph) {
int32_t n = 0; int32_t n = 0;
SBlockL *pBlockL = (SBlockL *)ph; SSstBlk *pSstBlk = (SSstBlk *)ph;
n += tGetI64(p + n, &pBlockL->suid); n += tGetI64(p + n, &pSstBlk->suid);
n += tGetI64(p + n, &pBlockL->minUid); n += tGetI64(p + n, &pSstBlk->minUid);
n += tGetI64(p + n, &pBlockL->maxUid); n += tGetI64(p + n, &pSstBlk->maxUid);
n += tGetI64v(p + n, &pBlockL->minKey); n += tGetI64v(p + n, &pSstBlk->minKey);
n += tGetI64v(p + n, &pBlockL->maxKey); n += tGetI64v(p + n, &pSstBlk->maxKey);
n += tGetI64v(p + n, &pBlockL->minVer); n += tGetI64v(p + n, &pSstBlk->minVer);
n += tGetI64v(p + n, &pBlockL->maxVer); n += tGetI64v(p + n, &pSstBlk->maxVer);
n += tGetI32v(p + n, &pBlockL->nRow); n += tGetI32v(p + n, &pSstBlk->nRow);
n += tGetI64v(p + n, &pBlockL->bInfo.offset); n += tGetI64v(p + n, &pSstBlk->bInfo.offset);
n += tGetI32v(p + n, &pBlockL->bInfo.szBlock); n += tGetI32v(p + n, &pSstBlk->bInfo.szBlock);
n += tGetI32v(p + n, &pBlockL->bInfo.szKey); n += tGetI32v(p + n, &pSstBlk->bInfo.szKey);
return n; return n;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册