提交 1132a7f7 编写于 作者: H Hongze Cheng

refact code

上级 3286ab2a
...@@ -50,7 +50,7 @@ typedef struct SBlockData SBlockData; ...@@ -50,7 +50,7 @@ typedef struct SBlockData SBlockData;
typedef struct SDelFile SDelFile; typedef struct SDelFile SDelFile;
typedef struct SHeadFile SHeadFile; typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile; typedef struct SDataFile SDataFile;
typedef struct SLastFile SLastFile; typedef struct SSstFile SSstFile;
typedef struct SSmaFile SSmaFile; typedef struct SSmaFile SSmaFile;
typedef struct SDFileSet SDFileSet; typedef struct SDFileSet SDFileSet;
typedef struct SDataFWriter SDataFWriter; typedef struct SDataFWriter SDataFWriter;
...@@ -219,7 +219,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2); ...@@ -219,7 +219,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2);
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype); int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype);
int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile); int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile);
int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile); int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile);
int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile); int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile);
int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile); int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile);
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile); int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile);
int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile); int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile);
...@@ -228,7 +228,7 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet); ...@@ -228,7 +228,7 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet);
void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]); void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]);
void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]); void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]);
void tsdbLastFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SLastFile *pLastF, char fname[]); void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]);
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]); void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]);
// SDelFile // SDelFile
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
...@@ -541,7 +541,7 @@ struct SDataFile { ...@@ -541,7 +541,7 @@ struct SDataFile {
int64_t size; int64_t size;
}; };
struct SLastFile { struct SSstFile {
volatile int32_t nRef; volatile int32_t nRef;
int64_t commitID; int64_t commitID;
...@@ -562,8 +562,8 @@ struct SDFileSet { ...@@ -562,8 +562,8 @@ struct SDFileSet {
SHeadFile *pHeadF; SHeadFile *pHeadF;
SDataFile *pDataF; SDataFile *pDataF;
SSmaFile *pSmaF; SSmaFile *pSmaF;
uint8_t nLastF; uint8_t nSstF;
SLastFile *aLastF[TSDB_MAX_LAST_FILE]; SSstFile *aSstF[TSDB_MAX_LAST_FILE];
}; };
struct SRowIter { struct SRowIter {
...@@ -598,7 +598,7 @@ struct SDataFWriter { ...@@ -598,7 +598,7 @@ struct SDataFWriter {
SHeadFile fHead; SHeadFile fHead;
SDataFile fData; SDataFile fData;
SSmaFile fSma; SSmaFile fSma;
SLastFile fLast[TSDB_MAX_LAST_FILE]; SSstFile fSst[TSDB_MAX_LAST_FILE];
uint8_t *aBuf[4]; uint8_t *aBuf[4];
}; };
......
...@@ -431,9 +431,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -431,9 +431,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
pCommitter->toLastOnly = 0; pCommitter->toLastOnly = 0;
SDataFReader *pReader = pCommitter->dReader.pReader; SDataFReader *pReader = pCommitter->dReader.pReader;
if (pReader) { if (pReader) {
if (pReader->pSet->nLastF >= pCommitter->maxLast) { if (pReader->pSet->nSstF >= pCommitter->maxLast) {
int8_t iIter = 0; int8_t iIter = 0;
for (int32_t iLast = 0; iLast < pReader->pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pReader->pSet->nSstF; iLast++) {
pIter = &pCommitter->aDataIter[iIter]; pIter = &pCommitter->aDataIter[iIter];
pIter->type = LAST_DATA_ITER; pIter->type = LAST_DATA_ITER;
pIter->iLast = iLast; pIter->iLast = iLast;
...@@ -457,9 +457,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { ...@@ -457,9 +457,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
iIter++; iIter++;
} }
} else { } else {
for (int32_t iLast = 0; iLast < pReader->pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pReader->pSet->nSstF; iLast++) {
SLastFile *pLastFile = pReader->pSet->aLastF[iLast]; SSstFile *pSstFile = pReader->pSet->aSstF[iLast];
if (pLastFile->size > pLastFile->offset) { if (pSstFile->size > pSstFile->offset) {
pCommitter->toLastOnly = 1; pCommitter->toLastOnly = 1;
break; break;
} }
...@@ -515,29 +515,29 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -515,29 +515,29 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
SHeadFile fHead = {.commitID = pCommitter->commitID}; SHeadFile fHead = {.commitID = pCommitter->commitID};
SDataFile fData = {.commitID = pCommitter->commitID}; SDataFile fData = {.commitID = pCommitter->commitID};
SSmaFile fSma = {.commitID = pCommitter->commitID}; SSmaFile fSma = {.commitID = pCommitter->commitID};
SLastFile fLast = {.commitID = pCommitter->commitID}; SSstFile fSst = {.commitID = pCommitter->commitID};
SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
if (pRSet) { if (pRSet) {
ASSERT(pRSet->nLastF <= pCommitter->maxLast); ASSERT(pRSet->nSstF <= pCommitter->maxLast);
fData = *pRSet->pDataF; fData = *pRSet->pDataF;
fSma = *pRSet->pSmaF; fSma = *pRSet->pSmaF;
wSet.diskId = pRSet->diskId; wSet.diskId = pRSet->diskId;
if (pRSet->nLastF < pCommitter->maxLast) { if (pRSet->nSstF < pCommitter->maxLast) {
for (int32_t iLast = 0; iLast < pRSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pRSet->nSstF; iLast++) {
wSet.aLastF[iLast] = pRSet->aLastF[iLast]; wSet.aSstF[iLast] = pRSet->aSstF[iLast];
} }
wSet.nLastF = pRSet->nLastF + 1; wSet.nSstF = pRSet->nSstF + 1;
} else { } else {
wSet.nLastF = 1; wSet.nSstF = 1;
} }
} else { } else {
SDiskID did = {0}; SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
wSet.diskId = did; wSet.diskId = did;
wSet.nLastF = 1; wSet.nSstF = 1;
} }
wSet.aLastF[wSet.nLastF - 1] = &fLast; wSet.aSstF[wSet.nSstF - 1] = &fSst;
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
if (code) goto _err; if (code) goto _err;
......
...@@ -255,8 +255,8 @@ void tsdbFSDestroy(STsdbFS *pFS) { ...@@ -255,8 +255,8 @@ void tsdbFSDestroy(STsdbFS *pFS) {
taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->pSmaF); taosMemoryFree(pSet->pSmaF);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) {
taosMemoryFree(pSet->aLastF[iLast]); taosMemoryFree(pSet->aSstF[iLast]);
} }
} }
...@@ -312,12 +312,12 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { ...@@ -312,12 +312,12 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
} }
// last =========== // last ===========
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname); tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[0], fname);
if (taosStatFile(fname, &size, NULL)) { if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (size != pSet->aLastF[0]->size) { if (size != pSet->aSstF[0]->size) {
code = TSDB_CODE_FILE_CORRUPTED; code = TSDB_CODE_FILE_CORRUPTED;
goto _err; goto _err;
} }
...@@ -509,8 +509,8 @@ int32_t tsdbFSClose(STsdb *pTsdb) { ...@@ -509,8 +509,8 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pDataF);
// last // last
ASSERT(pSet->aLastF[0]->nRef == 1); ASSERT(pSet->aSstF[0]->nRef == 1);
taosMemoryFree(pSet->aLastF[0]); taosMemoryFree(pSet->aSstF[0]);
// sma // sma
ASSERT(pSet->pSmaF->nRef == 1); ASSERT(pSet->pSmaF->nRef == 1);
...@@ -571,13 +571,13 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -571,13 +571,13 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
*fSet.pSmaF = *pSet->pSmaF; *fSet.pSmaF = *pSet->pSmaF;
// last // last
for (fSet.nLastF = 0; fSet.nLastF < pSet->nLastF; fSet.nLastF++) { for (fSet.nSstF = 0; fSet.nSstF < pSet->nSstF; fSet.nSstF++) {
fSet.aLastF[fSet.nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); fSet.aSstF[fSet.nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (fSet.aLastF[fSet.nLastF] == NULL) { if (fSet.aSstF[fSet.nSstF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
*fSet.aLastF[fSet.nLastF] = *pSet->aLastF[fSet.nLastF]; *fSet.aSstF[fSet.nSstF] = *pSet->aSstF[fSet.nSstF];
} }
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
...@@ -631,27 +631,27 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { ...@@ -631,27 +631,27 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
*pDFileSet->pDataF = *pSet->pDataF; *pDFileSet->pDataF = *pSet->pDataF;
*pDFileSet->pSmaF = *pSet->pSmaF; *pDFileSet->pSmaF = *pSet->pSmaF;
// last // last
if (pSet->nLastF > pDFileSet->nLastF) { if (pSet->nSstF > pDFileSet->nSstF) {
ASSERT(pSet->nLastF == pDFileSet->nLastF + 1); ASSERT(pSet->nSstF == pDFileSet->nSstF + 1);
pDFileSet->aLastF[pDFileSet->nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); pDFileSet->aSstF[pDFileSet->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (pDFileSet->aLastF[pDFileSet->nLastF] == NULL) { if (pDFileSet->aSstF[pDFileSet->nSstF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
*pDFileSet->aLastF[pDFileSet->nLastF] = *pSet->aLastF[pSet->nLastF - 1]; *pDFileSet->aSstF[pDFileSet->nSstF] = *pSet->aSstF[pSet->nSstF - 1];
pDFileSet->nLastF++; pDFileSet->nSstF++;
} else if (pSet->nLastF < pDFileSet->nLastF) { } else if (pSet->nSstF < pDFileSet->nSstF) {
ASSERT(pSet->nLastF == 1); ASSERT(pSet->nSstF == 1);
for (int32_t iLast = 1; iLast < pDFileSet->nLastF; iLast++) { for (int32_t iLast = 1; iLast < pDFileSet->nSstF; iLast++) {
taosMemoryFree(pDFileSet->aLastF[iLast]); taosMemoryFree(pDFileSet->aSstF[iLast]);
} }
*pDFileSet->aLastF[0] = *pSet->aLastF[0]; *pDFileSet->aSstF[0] = *pSet->aSstF[0];
pDFileSet->nLastF = 1; pDFileSet->nSstF = 1;
} else { } else {
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) {
*pDFileSet->aLastF[iLast] = *pSet->aLastF[iLast]; *pDFileSet->aSstF[iLast] = *pSet->aSstF[iLast];
} }
} }
...@@ -659,8 +659,8 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { ...@@ -659,8 +659,8 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
} }
} }
ASSERT(pSet->nLastF == 1); ASSERT(pSet->nSstF == 1);
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1}; SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSstF = 1};
// head // head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
...@@ -687,12 +687,12 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { ...@@ -687,12 +687,12 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
*fSet.pSmaF = *pSet->pSmaF; *fSet.pSmaF = *pSet->pSmaF;
// last // last
fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (fSet.aLastF[0] == NULL) { if (fSet.aSstF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
*fSet.aLastF[0] = *pSet->aLastF[0]; *fSet.aSstF[0] = *pSet->aSstF[0];
if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) { if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -862,74 +862,74 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -862,74 +862,74 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
// last // last
if (sameDisk) { if (sameDisk) {
if (pSetNew->nLastF > pSetOld->nLastF) { if (pSetNew->nSstF > pSetOld->nSstF) {
ASSERT(pSetNew->nLastF = pSetOld->nLastF + 1); ASSERT(pSetNew->nSstF = pSetOld->nSstF + 1);
pSetOld->aLastF[pSetOld->nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); pSetOld->aSstF[pSetOld->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (pSetOld->aLastF[pSetOld->nLastF] == NULL) { if (pSetOld->aSstF[pSetOld->nSstF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
*pSetOld->aLastF[pSetOld->nLastF] = *pSetNew->aLastF[pSetOld->nLastF]; *pSetOld->aSstF[pSetOld->nSstF] = *pSetNew->aSstF[pSetOld->nSstF];
pSetOld->aLastF[pSetOld->nLastF]->nRef = 1; pSetOld->aSstF[pSetOld->nSstF]->nRef = 1;
pSetOld->nLastF++; pSetOld->nSstF++;
} else if (pSetNew->nLastF < pSetOld->nLastF) { } else if (pSetNew->nSstF < pSetOld->nSstF) {
ASSERT(pSetNew->nLastF == 1); ASSERT(pSetNew->nSstF == 1);
for (int32_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSetOld->nSstF; iLast++) {
SLastFile *pLastFile = pSetOld->aLastF[iLast]; SSstFile *pSstFile = pSetOld->aSstF[iLast];
nRef = atomic_sub_fetch_32(&pLastFile->nRef, 1); nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pLastFile, fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pLastFile); taosMemoryFree(pSstFile);
} }
pSetOld->aLastF[iLast] = NULL; pSetOld->aSstF[iLast] = NULL;
} }
pSetOld->nLastF = 1; pSetOld->nSstF = 1;
pSetOld->aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); pSetOld->aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (pSetOld->aLastF[0] == NULL) { if (pSetOld->aSstF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
*pSetOld->aLastF[0] = *pSetNew->aLastF[0]; *pSetOld->aSstF[0] = *pSetNew->aSstF[0];
pSetOld->aLastF[0]->nRef = 1; pSetOld->aSstF[0]->nRef = 1;
} else { } else {
for (int32_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSetOld->nSstF; iLast++) {
SLastFile *pLastFile = pSetOld->aLastF[iLast]; SSstFile *pSstFile = pSetOld->aSstF[iLast];
nRef = atomic_sub_fetch_32(&pLastFile->nRef, 1); nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pLastFile, fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pLastFile); taosMemoryFree(pSstFile);
} }
pSetOld->aLastF[iLast] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); pSetOld->aSstF[iLast] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (pSetOld->aLastF[iLast] == NULL) { if (pSetOld->aSstF[iLast] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
*pSetOld->aLastF[iLast] = *pSetNew->aLastF[iLast]; *pSetOld->aSstF[iLast] = *pSetNew->aSstF[iLast];
pSetOld->aLastF[iLast]->nRef = 1; pSetOld->aSstF[iLast]->nRef = 1;
} }
} }
} else { } else {
ASSERT(pSetOld->nLastF == pSetNew->nLastF); ASSERT(pSetOld->nSstF == pSetNew->nSstF);
for (int32_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSetOld->nSstF; iLast++) {
SLastFile *pLastFile = pSetOld->aLastF[iLast]; SSstFile *pSstFile = pSetOld->aSstF[iLast];
nRef = atomic_sub_fetch_32(&pLastFile->nRef, 1); nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pLastFile, fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pLastFile); taosMemoryFree(pSstFile);
} }
pSetOld->aLastF[iLast] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); pSetOld->aSstF[iLast] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (pSetOld->aLastF[iLast] == NULL) { if (pSetOld->aSstF[iLast] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
*pSetOld->aLastF[iLast] = *pSetNew->aLastF[iLast]; *pSetOld->aSstF[iLast] = *pSetNew->aSstF[iLast];
pSetOld->aLastF[iLast]->nRef = 1; pSetOld->aSstF[iLast]->nRef = 1;
} }
} }
...@@ -963,12 +963,12 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -963,12 +963,12 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
taosMemoryFree(pSetOld->pSmaF); taosMemoryFree(pSetOld->pSmaF);
} }
for (int8_t iLast = 0; iLast < pSetOld->nLastF; iLast++) { for (int8_t iLast = 0; iLast < pSetOld->nSstF; iLast++) {
nRef = atomic_sub_fetch_32(&pSetOld->aLastF[iLast]->nRef, 1); nRef = atomic_sub_fetch_32(&pSetOld->aSstF[iLast]->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aLastF[iLast], fname); tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSstF[iLast], fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pSetOld->aLastF[iLast]); taosMemoryFree(pSetOld->aSstF[iLast]);
} }
} }
...@@ -976,7 +976,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -976,7 +976,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
continue; continue;
_add_new: _add_new:
fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nLastF = 1}; fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSstF = 1};
// head // head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
...@@ -1006,14 +1006,14 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { ...@@ -1006,14 +1006,14 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
fSet.pSmaF->nRef = 1; fSet.pSmaF->nRef = 1;
// last // last
ASSERT(pSetNew->nLastF == 1); ASSERT(pSetNew->nSstF == 1);
fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile));
if (fSet.aLastF[0] == NULL) { if (fSet.aSstF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
*fSet.aLastF[0] = *pSetNew->aLastF[0]; *fSet.aSstF[0] = *pSetNew->aSstF[0];
fSet.aLastF[0]->nRef = 1; fSet.aSstF[0]->nRef = 1;
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1061,8 +1061,8 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1061,8 +1061,8 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1); nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) {
nRef = atomic_fetch_add_32(&pSet->aLastF[iLast]->nRef, 1); nRef = atomic_fetch_add_32(&pSet->aSstF[iLast]->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
} }
...@@ -1121,13 +1121,13 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { ...@@ -1121,13 +1121,13 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
} }
// last // last
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) {
nRef = atomic_sub_fetch_32(&pSet->aLastF[iLast]->nRef, 1); nRef = atomic_sub_fetch_32(&pSet->aSstF[iLast]->nRef, 1);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname); tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iLast], fname);
taosRemoveFile(fname); taosRemoveFile(fname);
taosMemoryFree(pSet->aLastF[iLast]); taosMemoryFree(pSet->aSstF[iLast]);
/* code */ /* code */
} }
} }
......
...@@ -53,22 +53,22 @@ static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) { ...@@ -53,22 +53,22 @@ static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) {
return n; return n;
} }
int32_t tPutLastFile(uint8_t *p, SLastFile *pLastFile) { int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile) {
int32_t n = 0; int32_t n = 0;
n += tPutI64v(p ? p + n : p, pLastFile->commitID); n += tPutI64v(p ? p + n : p, pSstFile->commitID);
n += tPutI64v(p ? p + n : p, pLastFile->size); n += tPutI64v(p ? p + n : p, pSstFile->size);
n += tPutI64v(p ? p + n : p, pLastFile->offset); n += tPutI64v(p ? p + n : p, pSstFile->offset);
return n; return n;
} }
static int32_t tGetLastFile(uint8_t *p, SLastFile *pLastFile) { static int32_t tGetSstFile(uint8_t *p, SSstFile *pSstFile) {
int32_t n = 0; int32_t n = 0;
n += tGetI64v(p + n, &pLastFile->commitID); n += tGetI64v(p + n, &pSstFile->commitID);
n += tGetI64v(p + n, &pLastFile->size); n += tGetI64v(p + n, &pSstFile->size);
n += tGetI64v(p + n, &pLastFile->offset); n += tGetI64v(p + n, &pSstFile->offset);
return n; return n;
} }
...@@ -102,9 +102,9 @@ void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, ...@@ -102,9 +102,9 @@ void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF,
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data"); TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data");
} }
void tsdbLastFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SLastFile *pLastF, char fname[]) { void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did), snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pLastF->commitID, ".last"); TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSstF->commitID, ".sst");
} }
void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) { void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) {
...@@ -195,9 +195,9 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) { ...@@ -195,9 +195,9 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) {
n += tPutSmaFile(p ? p + n : p, pSet->pSmaF); n += tPutSmaFile(p ? p + n : p, pSet->pSmaF);
// last // last
n += tPutU8(p ? p + n : p, pSet->nLastF); n += tPutU8(p ? p + n : p, pSet->nSstF);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) {
n += tPutLastFile(p ? p + n : p, pSet->aLastF[iLast]); n += tPutSstFile(p ? p + n : p, pSet->aSstF[iLast]);
} }
return n; return n;
...@@ -235,14 +235,14 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { ...@@ -235,14 +235,14 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
n += tGetSmaFile(p + n, pSet->pSmaF); n += tGetSmaFile(p + n, pSet->pSmaF);
// last // last
n += tGetU8(p + n, &pSet->nLastF); n += tGetU8(p + n, &pSet->nSstF);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) {
pSet->aLastF[iLast] = (SLastFile *)taosMemoryCalloc(1, sizeof(SLastFile)); pSet->aSstF[iLast] = (SSstFile *)taosMemoryCalloc(1, sizeof(SSstFile));
if (pSet->aLastF[iLast] == NULL) { if (pSet->aSstF[iLast] == NULL) {
return -1; return -1;
} }
pSet->aLastF[iLast]->nRef = 1; pSet->aSstF[iLast]->nRef = 1;
n += tGetLastFile(p + n, pSet->aLastF[iLast]); n += tGetSstFile(p + n, pSet->aSstF[iLast]);
} }
return n; return n;
......
...@@ -99,12 +99,12 @@ void tLDataIterClose(SLDataIter *pIter) { ...@@ -99,12 +99,12 @@ void tLDataIterClose(SLDataIter *pIter) {
extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData);
void tLDataIterNextBlock(SLDataIter *pIter) { void tLDataIterNextBlock(SLDataIter *pIter) {
int32_t step = pIter->backward? -1:1; int32_t step = pIter->backward ? -1 : 1;
pIter->iBlockL += step; pIter->iBlockL += step;
int32_t index = -1; int32_t index = -1;
size_t size = taosArrayGetSize(pIter->aBlockL); size_t size = taosArrayGetSize(pIter->aBlockL);
for(int32_t i = pIter->iBlockL; i < size && i >= 0; i += step) { for (int32_t i = pIter->iBlockL; i < size && i >= 0; i += step) {
SBlockL *p = taosArrayGet(pIter->aBlockL, i); SBlockL *p = taosArrayGet(pIter->aBlockL, i);
if ((!pIter->backward) && p->minUid > pIter->uid) { if ((!pIter->backward) && p->minUid > pIter->uid) {
break; break;
...@@ -122,15 +122,15 @@ void tLDataIterNextBlock(SLDataIter *pIter) { ...@@ -122,15 +122,15 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
if (index == -1) { if (index == -1) {
pIter->pBlockL = NULL; pIter->pBlockL = NULL;
} else { } else {
pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL); pIter->pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, pIter->iBlockL);
} }
} }
static void findNextValidRow(SLDataIter* pIter) { static void findNextValidRow(SLDataIter *pIter) {
int32_t step = pIter->backward? -1:1; int32_t step = pIter->backward ? -1 : 1;
bool hasVal = false; bool hasVal = false;
int32_t i = pIter->iRow; int32_t i = pIter->iRow;
for (; i < pIter->bData.nRow && i >= 0; i += step) { for (; i < pIter->bData.nRow && i >= 0; i += step) {
if (pIter->bData.aUid != NULL) { if (pIter->bData.aUid != NULL) {
...@@ -150,7 +150,7 @@ static void findNextValidRow(SLDataIter* pIter) { ...@@ -150,7 +150,7 @@ static void findNextValidRow(SLDataIter* pIter) {
} }
int64_t ts = pIter->bData.aTSKEY[i]; int64_t ts = pIter->bData.aTSKEY[i];
if (!pIter->backward) { // asc if (!pIter->backward) { // asc
if (ts > pIter->timeWindow.ekey) { // no more data if (ts > pIter->timeWindow.ekey) { // no more data
break; break;
} else if (ts < pIter->timeWindow.skey) { } else if (ts < pIter->timeWindow.skey) {
...@@ -186,12 +186,12 @@ static void findNextValidRow(SLDataIter* pIter) { ...@@ -186,12 +186,12 @@ static void findNextValidRow(SLDataIter* pIter) {
break; break;
} }
pIter->iRow = (hasVal)? i:-1; pIter->iRow = (hasVal) ? i : -1;
} }
bool tLDataIterNextRow(SLDataIter *pIter) { bool tLDataIterNextRow(SLDataIter *pIter) {
int32_t code = 0; int32_t code = 0;
int32_t step = pIter->backward? -1:1; int32_t step = pIter->backward ? -1 : 1;
// no qualified last file block in current file, no need to fetch row // no qualified last file block in current file, no need to fetch row
if (pIter->pBlockL == NULL) { if (pIter->pBlockL == NULL) {
...@@ -206,12 +206,12 @@ bool tLDataIterNextRow(SLDataIter *pIter) { ...@@ -206,12 +206,12 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
goto _exit; goto _exit;
} }
pIter->iRow = (pIter->backward)? pIter->bData.nRow:-1; pIter->iRow = (pIter->backward) ? pIter->bData.nRow : -1;
} }
pIter->iRow += step; pIter->iRow += step;
while(1) { while (1) {
findNextValidRow(pIter); findNextValidRow(pIter);
if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) { if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) {
...@@ -237,16 +237,14 @@ bool tLDataIterNextRow(SLDataIter *pIter) { ...@@ -237,16 +237,14 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
_exit: _exit:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
} }
return (code == TSDB_CODE_SUCCESS) && (pIter->pBlockL != NULL); return (code == TSDB_CODE_SUCCESS) && (pIter->pBlockL != NULL);
} }
SRowInfo *tLDataIterGet(SLDataIter *pIter) { SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
return &pIter->rInfo;
}
// SMergeTree ================================================= // SMergeTree =================================================
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
...@@ -271,16 +269,17 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { ...@@ -271,16 +269,17 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
} }
} }
void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange) { void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid, STimeWindow *pTimeWindow,
SVersionRange *pVerRange) {
pMTree->backward = backward; pMTree->backward = backward;
pMTree->pIter = NULL; pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
struct SLDataIter* pIterList[TSDB_DEFAULT_LAST_FILE] = {0}; struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
for(int32_t i = 0; i < pFReader->pSet->nLastF; ++i) { // open all last file for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file
/*int32_t code = */tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange); /*int32_t code = */ tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
bool hasVal = tLDataIterNextRow(pIterList[i]); bool hasVal = tLDataIterNextRow(pIterList[i]);
if (hasVal) { if (hasVal) {
taosArrayPush(pMTree->pIterList, &pIterList[i]); taosArrayPush(pMTree->pIterList, &pIterList[i]);
...@@ -293,7 +292,7 @@ void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, ...@@ -293,7 +292,7 @@ void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader,
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
bool tMergeTreeNext(SMergeTree* pMTree) { bool tMergeTreeNext(SMergeTree *pMTree) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pMTree->pIter) { if (pMTree->pIter) {
SLDataIter *pIter = pMTree->pIter; SLDataIter *pIter = pMTree->pIter;
...@@ -326,14 +325,12 @@ bool tMergeTreeNext(SMergeTree* pMTree) { ...@@ -326,14 +325,12 @@ bool tMergeTreeNext(SMergeTree* pMTree) {
return pMTree->pIter != NULL; return pMTree->pIter != NULL;
} }
TSDBROW tMergeTreeGetRow(SMergeTree* pMTree) { TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
return pMTree->pIter->rInfo.row;
}
void tMergeTreeClose(SMergeTree* pMTree) { void tMergeTreeClose(SMergeTree *pMTree) {
size_t size = taosArrayGetSize(pMTree->pIterList); size_t size = taosArrayGetSize(pMTree->pIterList);
for(int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SLDataIter* pIter = taosArrayGetP(pMTree->pIterList, i); SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
tLDataIterClose(pIter); tLDataIterClose(pIter);
} }
......
...@@ -176,10 +176,10 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR ...@@ -176,10 +176,10 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
STsdbReader* pReader, bool* freeTSRow); STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STSRow** pTSRow); STsdbReader* pReader, STSRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader); STsdbReader* pReader);
...@@ -617,7 +617,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN ...@@ -617,7 +617,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
} }
} }
pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nLastF; pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSstF;
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
...@@ -1374,7 +1374,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1374,7 +1374,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) { bool mergeBlockData) {
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
...@@ -1860,9 +1860,7 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { ...@@ -1860,9 +1860,7 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
return key.ts; return key.ts;
} }
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
return pLastBlockReader->mergeTree.pIter != NULL;
}
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader) { STsdbReader* pReader) {
...@@ -1958,7 +1956,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -1958,7 +1956,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
} }
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
// no data in last block and block, no need to proceed. // no data in last block and block, no need to proceed.
...@@ -2185,7 +2182,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { ...@@ -2185,7 +2182,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
return code; return code;
} }
if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nLastF > 0) { if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSstF > 0) {
code = doLoadFileBlock(pReader, pIndexList, pBlockNum); code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList); taosArrayDestroy(pIndexList);
...@@ -2296,7 +2293,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2296,7 +2293,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) { while (1) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = pStatus->pTableIter; STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo->uid, pReader->pFileReader); bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo->uid, pReader->pFileReader);
if (!hasVal) { if (!hasVal) {
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
...@@ -2946,7 +2943,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -2946,7 +2943,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
} }
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow) { STSRow** pTSRow) {
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
...@@ -3020,7 +3017,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -3020,7 +3017,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
} }
if (pBlockScanInfo->iter.hasVal && pRow != NULL) { if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader,
freeTSRow);
} }
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
......
...@@ -434,8 +434,8 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS ...@@ -434,8 +434,8 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
} }
// last // last
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < pSet->nSstF; iLast++) {
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname); tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iLast], fname);
pReader->aLastFD[iLast] = taosOpenFile(fname, TD_FILE_READ); pReader->aLastFD[iLast] = taosOpenFile(fname, TD_FILE_READ);
if (pReader->aLastFD[iLast] == NULL) { if (pReader->aLastFD[iLast] == NULL) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
...@@ -475,7 +475,7 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { ...@@ -475,7 +475,7 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
} }
// last // last
for (int32_t iLast = 0; iLast < (*ppReader)->pSet->nLastF; iLast++) { for (int32_t iLast = 0; iLast < (*ppReader)->pSet->nSstF; iLast++) {
if (taosCloseFile(&(*ppReader)->aLastFD[iLast]) < 0) { if (taosCloseFile(&(*ppReader)->aLastFD[iLast]) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
...@@ -561,8 +561,8 @@ _err: ...@@ -561,8 +561,8 @@ _err:
int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) {
int32_t code = 0; int32_t code = 0;
int64_t offset = pReader->pSet->aLastF[iLast]->offset; int64_t offset = pReader->pSet->aSstF[iLast]->offset;
int64_t size = pReader->pSet->aLastF[iLast]->size - offset; int64_t size = pReader->pSet->aSstF[iLast]->size - offset;
int64_t n; int64_t n;
uint32_t delimiter; uint32_t delimiter;
...@@ -947,14 +947,14 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ...@@ -947,14 +947,14 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
.pHeadF = &pWriter->fHead, .pHeadF = &pWriter->fHead,
.pDataF = &pWriter->fData, .pDataF = &pWriter->fData,
.pSmaF = &pWriter->fSma, .pSmaF = &pWriter->fSma,
.nLastF = pSet->nLastF // .nSstF = pSet->nSstF //
}; };
pWriter->fHead = *pSet->pHeadF; pWriter->fHead = *pSet->pHeadF;
pWriter->fData = *pSet->pDataF; pWriter->fData = *pSet->pDataF;
pWriter->fSma = *pSet->pSmaF; pWriter->fSma = *pSet->pSmaF;
for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { for (int8_t iLast = 0; iLast < pSet->nSstF; iLast++) {
pWriter->wSet.aLastF[iLast] = &pWriter->fLast[iLast]; pWriter->wSet.aSstF[iLast] = &pWriter->fSst[iLast];
pWriter->fLast[iLast] = *pSet->aLastF[iLast]; pWriter->fSst[iLast] = *pSet->aSstF[iLast];
} }
// head // head
...@@ -1037,9 +1037,9 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ...@@ -1037,9 +1037,9 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
} }
// last // last
ASSERT(pWriter->fLast[pSet->nLastF - 1].size == 0); ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0);
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbLastFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fLast[pSet->nLastF - 1], fname); tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname);
pWriter->pLastFD = taosOpenFile(fname, flag); pWriter->pLastFD = taosOpenFile(fname, flag);
if (pWriter->pLastFD == NULL) { if (pWriter->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
...@@ -1050,7 +1050,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS ...@@ -1050,7 +1050,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pWriter->fLast[pWriter->wSet.nLastF - 1].size += TSDB_FHDR_SIZE; pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE;
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
...@@ -1181,7 +1181,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { ...@@ -1181,7 +1181,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
// last ============== // last ==============
memset(hdr, 0, TSDB_FHDR_SIZE); memset(hdr, 0, TSDB_FHDR_SIZE);
tPutLastFile(hdr, &pWriter->fLast[pWriter->wSet.nLastF - 1]); tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET); n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET);
...@@ -1302,14 +1302,14 @@ _err: ...@@ -1302,14 +1302,14 @@ _err:
} }
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) {
int32_t code = 0; int32_t code = 0;
SLastFile *pLastFile = &pWriter->fLast[pWriter->wSet.nLastF - 1]; SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1];
int64_t size; int64_t size;
int64_t n; int64_t n;
// check // check
if (taosArrayGetSize(aBlockL) == 0) { if (taosArrayGetSize(aBlockL) == 0) {
pLastFile->offset = pLastFile->size; pSstFile->offset = pSstFile->size;
goto _exit; goto _exit;
} }
...@@ -1342,12 +1342,12 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) { ...@@ -1342,12 +1342,12 @@ int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) {
} }
// update // update
pLastFile->offset = pLastFile->size; pSstFile->offset = pSstFile->size;
pLastFile->size += size; pSstFile->size += size;
_exit: _exit:
tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), tsdbTrace("vgId:%d tsdb write blockl, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode),
pLastFile->offset, size); pSstFile->offset, size);
return code; return code;
_err: _err:
...@@ -1431,7 +1431,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1431,7 +1431,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
ASSERT(pBlockData->nRow > 0); ASSERT(pBlockData->nRow > 0);
pBlkInfo->offset = toLast ? pWriter->fLast[pWriter->wSet.nLastF - 1].size : pWriter->fData.size; pBlkInfo->offset = toLast ? pWriter->fSst[pWriter->wSet.nSstF - 1].size : pWriter->fData.size;
pBlkInfo->szBlock = 0; pBlkInfo->szBlock = 0;
pBlkInfo->szKey = 0; pBlkInfo->szKey = 0;
...@@ -1475,7 +1475,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ...@@ -1475,7 +1475,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
// update info // update info
if (toLast) { if (toLast) {
pWriter->fLast[pWriter->wSet.nLastF - 1].size += pBlkInfo->szBlock; pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock;
} else { } else {
pWriter->fData.size += pBlkInfo->szBlock; pWriter->fData.size += pBlkInfo->szBlock;
} }
...@@ -1555,8 +1555,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { ...@@ -1555,8 +1555,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
taosCloseFile(&PInFD); taosCloseFile(&PInFD);
// last // last
tsdbLastFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aLastF[0], fNameFrom); tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom);
tsdbLastFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aLastF[0], fNameTo); tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) { if (pOutFD == NULL) {
...@@ -1570,7 +1570,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { ...@@ -1570,7 +1570,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
goto _err; goto _err;
} }
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aLastF[0]->size); n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size);
if (n < 0) { if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
......
...@@ -60,7 +60,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { ...@@ -60,7 +60,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if (expLevel < 0) { if (expLevel < 0) {
taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->aLastF[0]); taosMemoryFree(pSet->aSstF[0]);
taosMemoryFree(pSet->pSmaF); taosMemoryFree(pSet->pSmaF);
taosArrayRemove(fs.aDFileSet, iSet); taosArrayRemove(fs.aDFileSet, iSet);
iSet--; iSet--;
......
...@@ -931,25 +931,25 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 ...@@ -931,25 +931,25 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
// write // write
SHeadFile fHead; SHeadFile fHead;
SDataFile fData; SDataFile fData;
SLastFile fLast; SSstFile fLast;
SSmaFile fSma; SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aLastF[0] = &fLast, .pSmaF = &fSma}; SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSstF[0] = &fLast, .pSmaF = &fSma};
if (pSet) { if (pSet) {
wSet.diskId = pSet->diskId; wSet.diskId = pSet->diskId;
wSet.fid = fid; wSet.fid = fid;
wSet.nLastF = 1; wSet.nSstF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = *pSet->pDataF; fData = *pSet->pDataF;
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0}; fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0};
fSma = *pSet->pSmaF; fSma = *pSet->pSmaF;
} else { } else {
wSet.diskId = (SDiskID){.level = 0, .id = 0}; wSet.diskId = (SDiskID){.level = 0, .id = 0};
wSet.fid = fid; wSet.fid = fid;
wSet.nLastF = 1; wSet.nSstF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0}; fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0, .offset = 0}; fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0}; fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册