提交 3f90f99d 编写于 作者: H Hongze Cheng

first step optimization

上级 726006d1
......@@ -67,6 +67,7 @@ typedef struct SBlockCol SBlockCol;
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_MAX_LAST_FILE 16
#define TSDB_FHDR_SIZE 512
#define HAS_NONE ((int8_t)0x1)
......@@ -556,8 +557,9 @@ struct SDFileSet {
int32_t fid;
SHeadFile *pHeadF;
SDataFile *pDataF;
SLastFile *pLastF;
SSmaFile *pSmaF;
uint8_t nLastF;
SLastFile *aLastF[TSDB_MAX_LAST_FILE];
};
struct SRowIter {
......
......@@ -458,10 +458,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
SDataFile fData;
SLastFile fLast;
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aLastF[0] = &fLast, .pSmaF = &fSma};
if (pRSet) {
wSet.diskId = pRSet->diskId;
wSet.fid = pCommitter->commitFid;
wSet.nLastF = 1;
fHead = (SHeadFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
fData = *pRSet->pDataF;
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
......@@ -475,6 +476,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
wSet.diskId = did;
wSet.fid = pCommitter->commitFid;
wSet.nLastF = 1;
fHead = (SHeadFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
......
......@@ -254,8 +254,10 @@ void tsdbFSDestroy(STsdbFS *pFS) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet);
taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->pLastF);
taosMemoryFree(pSet->pSmaF);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) {
taosMemoryFree(pSet->aLastF[iLast]);
}
}
taosArrayDestroy(pFS->aDFileSet);
......@@ -310,12 +312,12 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
}
// last ===========
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pLastF, fname);
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname);
if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size != pSet->pLastF->size) {
if (size != pSet->aLastF[0]->size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
......@@ -382,41 +384,15 @@ static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) {
taosArrayClear(pTsdb->fs.aDFileSet);
n += tGetU32v(pData + n, &nSet);
for (uint32_t iSet = 0; iSet < nSet; iSet++) {
SDFileSet fSet;
// head
fSet.pHeadF = (SHeadFile *)taosMemoryCalloc(1, sizeof(SHeadFile));
if (fSet.pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
fSet.pHeadF->nRef = 1;
SDFileSet fSet = {0};
// data
fSet.pDataF = (SDataFile *)taosMemoryCalloc(1, sizeof(SDataFile));
if (fSet.pDataF == NULL) {
int32_t nt = tGetDFileSet(pData + n, &fSet);
if (nt < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
fSet.pDataF->nRef = 1;
// last
fSet.pLastF = (SLastFile *)taosMemoryCalloc(1, sizeof(SLastFile));
if (fSet.pLastF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
fSet.pLastF->nRef = 1;
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryCalloc(1, sizeof(SSmaFile));
if (fSet.pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
fSet.pSmaF->nRef = 1;
n += tGetDFileSet(pData + n, &fSet);
n += nt;
if (taosArrayPush(pTsdb->fs.aDFileSet, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -533,8 +509,8 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
taosMemoryFree(pSet->pDataF);
// last
ASSERT(pSet->pLastF->nRef == 1);
taosMemoryFree(pSet->pLastF);
ASSERT(pSet->aLastF[0]->nRef == 1);
taosMemoryFree(pSet->aLastF[0]);
// sma
ASSERT(pSet->pSmaF->nRef == 1);
......@@ -568,7 +544,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid};
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1};
// head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
......@@ -586,15 +562,15 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
}
*fSet.pDataF = *pSet->pDataF;
// data
fSet.pLastF = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.pLastF == NULL) {
// last
fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.aLastF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.pLastF = *pSet->pLastF;
*fSet.aLastF[0] = *pSet->aLastF[0];
// last
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (fSet.pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -651,14 +627,14 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
if (c == 0) {
*pDFileSet->pHeadF = *pSet->pHeadF;
*pDFileSet->pDataF = *pSet->pDataF;
*pDFileSet->pLastF = *pSet->pLastF;
*pDFileSet->aLastF[0] = *pSet->aLastF[0];
*pDFileSet->pSmaF = *pSet->pSmaF;
goto _exit;
}
}
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid};
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1};
// head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
......@@ -676,15 +652,15 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
}
*fSet.pDataF = *pSet->pDataF;
// data
fSet.pLastF = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.pLastF == NULL) {
// last
fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.aLastF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.pLastF = *pSet->pLastF;
*fSet.aLastF[0] = *pSet->aLastF[0];
// last
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (fSet.pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -837,24 +813,24 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
}
// last
fSet.pLastF = pSetOld->pLastF;
if ((!sameDisk) || (pSetOld->pLastF->commitID != pSetNew->pLastF->commitID)) {
pSetOld->pLastF = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (pSetOld->pLastF == NULL) {
fSet.aLastF[0] = pSetOld->aLastF[0];
if ((!sameDisk) || (pSetOld->aLastF[0]->commitID != pSetNew->aLastF[0]->commitID)) {
pSetOld->aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (pSetOld->aLastF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*pSetOld->pLastF = *pSetNew->pLastF;
pSetOld->pLastF->nRef = 1;
*pSetOld->aLastF[0] = *pSetNew->aLastF[0];
pSetOld->aLastF[0]->nRef = 1;
nRef = atomic_sub_fetch_32(&fSet.pLastF->nRef, 1);
nRef = atomic_sub_fetch_32(&fSet.aLastF[0]->nRef, 1);
if (nRef == 0) {
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.pLastF, fname);
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, fSet.aLastF[0], fname);
taosRemoveFile(fname);
taosMemoryFree(fSet.pLastF);
taosMemoryFree(fSet.aLastF[0]);
}
} else {
ASSERT(pSetOld->pLastF->size == pSetNew->pLastF->size);
ASSERT(pSetOld->aLastF[0]->size == pSetNew->aLastF[0]->size);
}
// sma
......@@ -902,11 +878,11 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
taosMemoryFree(pSetOld->pDataF);
}
nRef = atomic_sub_fetch_32(&pSetOld->pLastF->nRef, 1);
nRef = atomic_sub_fetch_32(&pSetOld->aLastF[0]->nRef, 1);
if (nRef == 0) {
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->pLastF, fname);
tsdbLastFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aLastF[0], fname);
taosRemoveFile(fname);
taosMemoryFree(pSetOld->pLastF);
taosMemoryFree(pSetOld->aLastF[0]);
}
nRef = atomic_sub_fetch_32(&pSetOld->pSmaF->nRef, 1);
......@@ -922,6 +898,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
_add_new:
fSet.diskId = pSetNew->diskId;
fSet.fid = pSetNew->fid;
fSet.nLastF = 1;
// head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
......@@ -942,13 +919,13 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
fSet.pDataF->nRef = 1;
// last
fSet.pLastF = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.pLastF == NULL) {
fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.aLastF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
*fSet.pLastF = *pSetNew->pLastF;
fSet.pLastF->nRef = 1;
*fSet.aLastF[0] = *pSetNew->aLastF[0];
fSet.aLastF[0]->nRef = 1;
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
......@@ -1002,7 +979,7 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1);
ASSERT(nRef > 0);
nRef = atomic_fetch_add_32(&pSet->pLastF->nRef, 1);
nRef = atomic_fetch_add_32(&pSet->aLastF[0]->nRef, 1);
ASSERT(nRef > 0);
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
......@@ -1054,12 +1031,12 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
}
// last
nRef = atomic_sub_fetch_32(&pSet->pLastF->nRef, 1);
nRef = atomic_sub_fetch_32(&pSet->aLastF[0]->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pLastF, fname);
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname);
taosRemoveFile(fname);
taosMemoryFree(pSet->pLastF);
taosMemoryFree(pSet->aLastF[0]);
}
// sma
......
......@@ -195,8 +195,10 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) {
n += tPutSmaFile(p ? p + n : p, pSet->pSmaF);
// last
n += tPutU8(p ? p + n : p, 1); // for future compatibility
n += tPutLastFile(p ? p + n : p, pSet->pLastF);
n += tPutU8(p ? p + n : p, pSet->nLastF);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) {
n += tPutLastFile(p ? p + n : p, pSet->aLastF[iLast]);
}
return n;
}
......@@ -208,15 +210,40 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
n += tGetI32v(p + n, &pSet->diskId.id);
n += tGetI32v(p + n, &pSet->fid);
// data
// head
pSet->pHeadF = (SHeadFile *)taosMemoryCalloc(1, sizeof(SHeadFile));
if (pSet->pHeadF == NULL) {
return -1;
}
pSet->pHeadF->nRef = 1;
n += tGetHeadFile(p + n, pSet->pHeadF);
// data
pSet->pDataF = (SDataFile *)taosMemoryCalloc(1, sizeof(SDataFile));
if (pSet->pDataF == NULL) {
return -1;
}
pSet->pDataF->nRef = 1;
n += tGetDataFile(p + n, pSet->pDataF);
// sma
pSet->pSmaF = (SSmaFile *)taosMemoryCalloc(1, sizeof(SSmaFile));
if (pSet->pSmaF == NULL) {
return -1;
}
pSet->pSmaF->nRef = 1;
n += tGetSmaFile(p + n, pSet->pSmaF);
// last
uint8_t nLast;
n += tGetU8(p + n, &nLast);
n += tGetLastFile(p + n, pSet->pLastF);
n += tGetU8(p + n, &pSet->nLastF);
for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) {
pSet->aLastF[iLast] = (SLastFile *)taosMemoryCalloc(1, sizeof(SLastFile));
if (pSet->aLastF[iLast] == NULL) {
return -1;
}
pSet->aLastF[iLast]->nRef = 1;
n += tGetLastFile(p + n, pSet->aLastF[iLast]);
}
return n;
}
......
......@@ -436,14 +436,6 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
goto _err;
}
// last
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pLastF, fname);
pReader->pLastFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// sma
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ);
......@@ -452,6 +444,14 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
goto _err;
}
// last
tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname);
pReader->pLastFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
*ppReader = pReader;
return code;
......@@ -565,8 +565,8 @@ _err:
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) {
int32_t code = 0;
int64_t offset = pReader->pSet->pLastF->offset;
int64_t size = pReader->pSet->pLastF->size - offset;
int64_t offset = pReader->pSet->aLastF[0]->offset;
int64_t size = pReader->pSet->aLastF[0]->size - offset;
int64_t n;
uint32_t delimiter;
......@@ -935,11 +935,11 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
.fid = pSet->fid,
.pHeadF = &pWriter->fHead,
.pDataF = &pWriter->fData,
.pLastF = &pWriter->fLast,
.aLastF[0] = &pWriter->fLast,
.pSmaF = &pWriter->fSma};
pWriter->fHead = *pSet->pHeadF;
pWriter->fData = *pSet->pDataF;
pWriter->fLast = *pSet->pLastF;
pWriter->fLast = *pSet->aLastF[0];
pWriter->fSma = *pSet->pSmaF;
// head
......@@ -1554,8 +1554,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
taosCloseFile(&PInFD);
// last
tsdbLastFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pLastF, fNameFrom);
tsdbLastFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pLastF, fNameTo);
tsdbLastFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aLastF[0], fNameFrom);
tsdbLastFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aLastF[0], fNameTo);
pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (pOutFD == NULL) {
......@@ -1569,7 +1569,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->pLastF->size);
n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aLastF[0]->size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......
......@@ -60,7 +60,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if (expLevel < 0) {
taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->pLastF);
taosMemoryFree(pSet->aLastF[0]);
taosMemoryFree(pSet->pSmaF);
taosArrayRemove(fs.aDFileSet, iSet);
iSet--;
......
......@@ -933,11 +933,12 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
SDataFile fData;
SLastFile fLast;
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aLastF[0] = &fLast, .pSmaF = &fSma};
if (pSet) {
wSet.diskId = pSet->diskId;
wSet.fid = fid;
wSet.nLastF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = *pSet->pDataF;
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
......@@ -945,6 +946,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
} else {
wSet.diskId = (SDiskID){.level = 0, .id = 0};
wSet.fid = fid;
wSet.nLastF = 1;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0, .offset = 0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册