提交 fa257f50 编写于 作者: H Hongze Cheng

more code

上级 1ae53f1a
......@@ -589,13 +589,13 @@ struct SDataFWriter {
TdFilePtr pHeadFD;
TdFilePtr pDataFD;
TdFilePtr pLastFD;
TdFilePtr pSmaFD;
TdFilePtr pLastFD;
SHeadFile fHead;
SDataFile fData;
SLastFile fLast;
SSmaFile fSma;
SLastFile fLast[TSDB_MAX_LAST_FILE];
uint8_t *aBuf[4];
};
......
......@@ -437,31 +437,43 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
// Writer
SHeadFile fHead;
SDataFile fData;
SLastFile fLast;
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aLastF[0] = &fLast, .pSmaF = &fSma};
SLastFile fLast;
SDFileSet wSet = {0};
if (pRSet) {
wSet.diskId = pRSet->diskId;
wSet.fid = pCommitter->commitFid;
wSet.nLastF = 1;
fHead = (SHeadFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
ASSERT(pRSet->nLastF < pCommitter->maxLast);
fHead = (SHeadFile){.commitID = pCommitter->commitID};
fData = *pRSet->pDataF;
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
fSma = *pRSet->pSmaF;
fLast = (SLastFile){.commitID = pCommitter->commitID};
wSet.diskId = pRSet->diskId;
wSet.fid = pCommitter->commitFid;
wSet.pHeadF = &fHead;
wSet.pDataF = &fData;
wSet.pSmaF = &fSma;
for (int8_t iLast = 0; iLast < pRSet->nLastF; iLast++) {
wSet.aLastF[iLast] = pRSet->aLastF[iLast];
}
wSet.nLastF = pRSet->nLastF + 1;
wSet.aLastF[wSet.nLastF - 1] = &fLast; // todo
} else {
SDiskID did = {0};
fHead = (SHeadFile){.commitID = pCommitter->commitID};
fData = (SDataFile){.commitID = pCommitter->commitID};
fSma = (SSmaFile){.commitID = pCommitter->commitID};
fLast = (SLastFile){.commitID = pCommitter->commitID};
SDiskID did = {0};
tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did);
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
wSet.diskId = did;
wSet.fid = pCommitter->commitFid;
wSet.pHeadF = &fHead;
wSet.pDataF = &fData;
wSet.pSmaF = &fSma;
wSet.nLastF = 1;
fHead = (SHeadFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0, .offset = 0};
fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0};
wSet.aLastF[0] = &fLast;
}
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
if (code) goto _err;
......
......@@ -629,13 +629,35 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
if (c == 0) {
*pDFileSet->pHeadF = *pSet->pHeadF;
*pDFileSet->pDataF = *pSet->pDataF;
*pDFileSet->aLastF[0] = *pSet->aLastF[0];
*pDFileSet->pSmaF = *pSet->pSmaF;
// last
if (pSet->nLastF > pDFileSet->nLastF) {
ASSERT(pSet->nLastF == pDFileSet->nLastF + 1);
pDFileSet->aLastF[pDFileSet->nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (pDFileSet->aLastF[pDFileSet->nLastF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*pDFileSet->aLastF[pDFileSet->nLastF] = *pSet->aLastF[pSet->nLastF - 1];
pDFileSet->nLastF++;
} else if (pSet->nLastF < pDFileSet->nLastF) {
ASSERT(pSet->nLastF == 1);
for (int32_t iLast = 1; iLast < pDFileSet->nLastF; iLast++) {
taosMemoryFree(pDFileSet->aLastF[iLast]);
}
*pDFileSet->aLastF[0] = *pSet->aLastF[0];
pDFileSet->nLastF = 1;
} else {
ASSERT(0);
}
goto _exit;
}
}
ASSERT(pSet->nLastF == 1);
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1};
// head
......@@ -654,14 +676,6 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
}
*fSet.pDataF = *pSet->pDataF;
// last
fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.aLastF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.aLastF[0] = *pSet->aLastF[0];
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (fSet.pSmaF == NULL) {
......@@ -670,6 +684,14 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
}
*fSet.pSmaF = *pSet->pSmaF;
// last
fSet.aLastF[0] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile));
if (fSet.aLastF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.aLastF[0] = *pSet->aLastF[0];
if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......
......@@ -936,18 +936,22 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (code) goto _err;
pWriter->pTsdb = pTsdb;
pWriter->wSet = (SDFileSet){.diskId = pSet->diskId,
.fid = pSet->fid,
.pHeadF = &pWriter->fHead,
.pDataF = &pWriter->fData,
.aLastF[0] = &pWriter->fLast,
.pSmaF = &pWriter->fSma};
pWriter->wSet = (SDFileSet){
.diskId = pSet->diskId,
.fid = pSet->fid,
.pHeadF = &pWriter->fHead,
.pDataF = &pWriter->fData,
.pSmaF = &pWriter->fSma,
.nLastF = pSet->nLastF //
};
pWriter->fHead = *pSet->pHeadF;
pWriter->fData = *pSet->pDataF;
pWriter->fLast = *pSet->aLastF[0];
pWriter->fSma = *pSet->pSmaF;
for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) {
pWriter->wSet.aLastF[iLast] = &pWriter->fLast[iLast];
pWriter->fLast[iLast] = *pSet->aLastF[iLast];
}
// head
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
......@@ -998,36 +1002,6 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
ASSERT(n == pWriter->fData.size);
}
// last
if (pWriter->fLast.size == 0) {
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
} else {
flag = TD_FILE_WRITE;
}
tsdbLastFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fLast, fname);
pWriter->pLastFD = taosOpenFile(fname, flag);
if (pWriter->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (pWriter->fLast.size == 0) {
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pWriter->fLast.size += TSDB_FHDR_SIZE;
} else {
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_END);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == pWriter->fLast.size);
}
// sma
if (pWriter->fSma.size == 0) {
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
......@@ -1058,6 +1032,22 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
ASSERT(n == pWriter->fSma.size);
}
// last
ASSERT(pWriter->fLast[pSet->nLastF - 1].size == 0);
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbLastFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fLast[pSet->nLastF - 1], fname);
pWriter->pLastFD = taosOpenFile(fname, flag);
if (pWriter->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pWriter->fLast[pWriter->wSet.nLastF - 1].size += TSDB_FHDR_SIZE;
*ppWriter = pWriter;
return code;
......@@ -1085,12 +1075,12 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
goto _err;
}
if (taosFsyncFile((*ppWriter)->pLastFD) < 0) {
if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile((*ppWriter)->pSmaFD) < 0) {
if (taosFsyncFile((*ppWriter)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
......@@ -1106,12 +1096,12 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) {
goto _err;
}
if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) {
if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosCloseFile(&(*ppWriter)->pSmaFD) < 0) {
if (taosCloseFile(&(*ppWriter)->pLastFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
......@@ -1168,35 +1158,35 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
goto _err;
}
// last ==============
// sma ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutLastFile(hdr, &pWriter->fLast);
tPutSmaFile(hdr, &pWriter->fSma);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET);
n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// sma ==============
// last ==============
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutSmaFile(hdr, &pWriter->fSma);
tPutLastFile(hdr, &pWriter->fLast[pWriter->wSet.nLastF - 1]);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET);
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE);
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -1309,7 +1299,7 @@ _err:
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL) {
int32_t code = 0;
SLastFile *pLastFile = &pWriter->fLast;
SLastFile *pLastFile = &pWriter->fLast[pWriter->wSet.nLastF - 1];
int64_t size;
int64_t n;
......@@ -1437,7 +1427,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
ASSERT(pBlockData->nRow > 0);
pBlkInfo->offset = toLast ? pWriter->fLast.size : pWriter->fData.size;
pBlkInfo->offset = toLast ? pWriter->fLast[pWriter->wSet.nLastF - 1].size : pWriter->fData.size;
pBlkInfo->szBlock = 0;
pBlkInfo->szKey = 0;
......@@ -1481,7 +1471,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
// update info
if (toLast) {
pWriter->fLast.size += pBlkInfo->szBlock;
pWriter->fLast[pWriter->wSet.nLastF - 1].size += pBlkInfo->szBlock;
} else {
pWriter->fData.size += pBlkInfo->szBlock;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册