提交 478d5ebb 编写于 作者: H Hongze Cheng

more code

上级 573a4191
...@@ -262,7 +262,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS ...@@ -262,7 +262,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL); int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData); int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData);
int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData); int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData);
......
...@@ -481,7 +481,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { ...@@ -481,7 +481,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
taosArrayClear(state->aBlockL); taosArrayClear(state->aBlockL);
} }
code = tsdbReadBlockL(state->pDataFReader, state->aBlockL); code = tsdbReadBlockL(state->pDataFReader, 0, state->aBlockL);
if (code) goto _err; if (code) goto _err;
// SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL); // SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL);
......
...@@ -35,6 +35,7 @@ typedef struct { ...@@ -35,6 +35,7 @@ typedef struct {
int32_t minRow; int32_t minRow;
int32_t maxRow; int32_t maxRow;
int8_t cmprAlg; int8_t cmprAlg;
int8_t maxLast;
SArray *aTbDataP; // memory SArray *aTbDataP; // memory
STsdbFS fs; // disk STsdbFS fs; // disk
// -------------- // --------------
...@@ -45,19 +46,11 @@ typedef struct { ...@@ -45,19 +46,11 @@ typedef struct {
// commit file data // commit file data
struct { struct {
SDataFReader *pReader; SDataFReader *pReader;
// data SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aBlockIdx; // SArray<SBlockIdx> int32_t iBlockIdx;
int32_t iBlockIdx; SBlockIdx *pBlockIdx;
SBlockIdx *pBlockIdx; SMapData mBlock; // SMapData<SBlock>
SMapData mBlock; // SMapData<SBlock> SBlockData bData;
SBlockData bData;
// last
SArray *aBlockL; // SArray<SBlockL>
int32_t iBlockL;
SBlockData bDatal;
int32_t iRow;
SRowInfo *pRowInfo;
SRowInfo rowInfo;
} dReader; } dReader;
struct { struct {
SDataFWriter *pWriter; SDataFWriter *pWriter;
...@@ -437,20 +430,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -437,20 +430,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->dReader.pBlockIdx = NULL; pCommitter->dReader.pBlockIdx = NULL;
} }
tBlockDataReset(&pCommitter->dReader.bData); tBlockDataReset(&pCommitter->dReader.bData);
// last
code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL);
if (code) goto _err;
pCommitter->dReader.iBlockL = -1;
pCommitter->dReader.iRow = -1;
pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo;
tBlockDataReset(&pCommitter->dReader.bDatal);
code = tsdbCommitterNextLastRow(pCommitter);
if (code) goto _err;
} else { } else {
pCommitter->dReader.pBlockIdx = NULL; pCommitter->dReader.pBlockIdx = NULL;
pCommitter->dReader.pRowInfo = NULL;
} }
// Writer // Writer
...@@ -1273,6 +1254,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { ...@@ -1273,6 +1254,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pCommitter->maxLast = TSDB_DEFAULT_LAST_FILE; // TODO: make it as a config
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) { if (pCommitter->aTbDataP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1301,15 +1283,6 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { ...@@ -1301,15 +1283,6 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
code = tBlockDataCreate(&pCommitter->dReader.bData); code = tBlockDataCreate(&pCommitter->dReader.bData);
if (code) goto _exit; if (code) goto _exit;
pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL));
if (pCommitter->dReader.aBlockL == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataCreate(&pCommitter->dReader.bDatal);
if (code) goto _exit;
// Writer // Writer
pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dWriter.aBlockIdx == NULL) { if (pCommitter->dWriter.aBlockIdx == NULL) {
...@@ -1338,8 +1311,6 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { ...@@ -1338,8 +1311,6 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->dReader.aBlockIdx); taosArrayDestroy(pCommitter->dReader.aBlockIdx);
tMapDataClear(&pCommitter->dReader.mBlock); tMapDataClear(&pCommitter->dReader.mBlock);
tBlockDataDestroy(&pCommitter->dReader.bData, 1); tBlockDataDestroy(&pCommitter->dReader.bData, 1);
taosArrayDestroy(pCommitter->dReader.aBlockL);
tBlockDataDestroy(&pCommitter->dReader.bDatal, 1);
// Writer // Writer
taosArrayDestroy(pCommitter->dWriter.aBlockIdx); taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
......
...@@ -544,7 +544,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -544,7 +544,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1}; SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid};
// head // head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
...@@ -576,13 +576,13 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -576,13 +576,13 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
} }
// last // last
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (fSet.nLastF = 0; fSet.nLastF < pSet->nLastF; fSet.nLastF++) {
fSet.aLastF[iLast] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); fSet.aLastF[fSet.nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.aLastF[iLast] == NULL) { if (fSet.aLastF[fSet.nLastF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
*fSet.aLastF[iLast] = *pSet->aLastF[iLast]; *fSet.aLastF[fSet.nLastF] = *pSet->aLastF[fSet.nLastF];
} }
} }
...@@ -981,12 +981,14 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -981,12 +981,14 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1); nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
nRef = atomic_fetch_add_32(&pSet->aLastF[0]->nRef, 1);
ASSERT(nRef > 0);
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1); nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) {
nRef = atomic_fetch_add_32(&pSet->aLastF[iLast]->nRef, 1);
ASSERT(nRef > 0);
}
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -1032,15 +1034,6 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1032,15 +1034,6 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pDataF);
} }
// last
nRef = atomic_sub_fetch_32(&pSet->aLastF[0]->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname);
taosRemoveFile(fname);
taosMemoryFree(pSet->aLastF[0]);
}
// sma // sma
nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1); nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
...@@ -1049,6 +1042,18 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1049,6 +1042,18 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pSet->pSmaF); taosMemoryFree(pSet->pSmaF);
} }
// last
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) {
nRef = atomic_sub_fetch_32(&pSet->aLastF[iLast]->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname);
taosRemoveFile(fname);
taosMemoryFree(pSet->aLastF[iLast]);
/* code */
}
}
} }
taosArrayDestroy(pFS->aDFileSet); taosArrayDestroy(pFS->aDFileSet);
......
...@@ -399,8 +399,8 @@ struct SDataFReader { ...@@ -399,8 +399,8 @@ struct SDataFReader {
SDFileSet *pSet; SDFileSet *pSet;
TdFilePtr pHeadFD; TdFilePtr pHeadFD;
TdFilePtr pDataFD; TdFilePtr pDataFD;
TdFilePtr pLastFD;
TdFilePtr pSmaFD; TdFilePtr pSmaFD;
TdFilePtr aLastFD[TSDB_MAX_LAST_FILE];
uint8_t *aBuf[3]; uint8_t *aBuf[3];
}; };
...@@ -445,11 +445,13 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS ...@@ -445,11 +445,13 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
} }
// last // last
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname); for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) {
pReader->pLastFD = taosOpenFile(fname, TD_FILE_READ); tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname);
if (pReader->pLastFD == NULL) { pReader->aLastFD[iLast] = taosOpenFile(fname, TD_FILE_READ);
code = TAOS_SYSTEM_ERROR(errno); if (pReader->aLastFD[iLast] == NULL) {
goto _err; code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} }
*ppReader = pReader; *ppReader = pReader;
...@@ -465,30 +467,35 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { ...@@ -465,30 +467,35 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
int32_t code = 0; int32_t code = 0;
if (*ppReader == NULL) goto _exit; if (*ppReader == NULL) goto _exit;
// head
if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) { if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
// data
if (taosCloseFile(&(*ppReader)->pDataFD) < 0) { if (taosCloseFile(&(*ppReader)->pDataFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (taosCloseFile(&(*ppReader)->pLastFD) < 0) { // sma
if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) { // last
code = TAOS_SYSTEM_ERROR(errno); for (int32_t iLast = 0; iLast < (*ppReader)->pSet->nLastF; iLast++) {
goto _err; if (taosCloseFile(&(*ppReader)->aLastFD[iLast]) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
} }
for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) {
tFree((*ppReader)->aBuf[iBuf]); tFree((*ppReader)->aBuf[iBuf]);
} }
taosMemoryFree(*ppReader); taosMemoryFree(*ppReader);
_exit: _exit:
...@@ -563,10 +570,10 @@ _err: ...@@ -563,10 +570,10 @@ _err:
return code; return code;
} }
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) { int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) {
int32_t code = 0; int32_t code = 0;
int64_t offset = pReader->pSet->aLastF[0]->offset; int64_t offset = pReader->pSet->aLastF[iLast]->offset;
int64_t size = pReader->pSet->aLastF[0]->size - offset; int64_t size = pReader->pSet->aLastF[iLast]->size - offset;
int64_t n; int64_t n;
uint32_t delimiter; uint32_t delimiter;
...@@ -580,13 +587,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) { ...@@ -580,13 +587,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) {
if (code) goto _err; if (code) goto _err;
// seek // seek
if (taosLSeekFile(pReader->pLastFD, offset, SEEK_SET) < 0) { if (taosLSeekFile(pReader->aLastFD[iLast], offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
// read // read
n = taosReadFile(pReader->pLastFD, pReader->aBuf[0], size); n = taosReadFile(pReader->aLastFD[iLast], pReader->aBuf[0], size);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
...@@ -745,7 +752,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo ...@@ -745,7 +752,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
tBlockDataClear(pBlockData); tBlockDataClear(pBlockData);
TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD; TdFilePtr pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo)
// uid + version + tskey // uid + version + tskey
code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->aBuf[0], pBlkInfo->szKey, 1); code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->aBuf[0], pBlkInfo->szKey, 1);
......
...@@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx);
if (code) goto _err; if (code) goto _err;
code = tsdbReadBlockL(pReader->pDataFReader, pReader->aBlockL); code = tsdbReadBlockL(pReader->pDataFReader, 0, pReader->aBlockL);
if (code) goto _err; if (code) goto _err;
// init // init
...@@ -911,7 +911,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 ...@@ -911,7 +911,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx);
if (code) goto _err; if (code) goto _err;
code = tsdbReadBlockL(pWriter->pDataFReader, pWriter->aBlockL); code = tsdbReadBlockL(pWriter->pDataFReader, 0, pWriter->aBlockL);
if (code) goto _err; if (code) goto _err;
} else { } else {
ASSERT(pWriter->pDataFReader == NULL); ASSERT(pWriter->pDataFReader == NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册