提交 d814016e 编写于 作者: H Hongze Cheng

more work

上级 2a41f1d3
......@@ -183,6 +183,9 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
// tsdbFile.c ==============================================================================================
typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT;
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]);
bool tsdbFileIsSame(SDFileSet *pDFileSet1, SDFileSet *pDFileSet2, EDataFileT ftype);
int32_t tsdbUpdateDFileHdr(TdFilePtr pFD, SDFileSet *pSet, EDataFileT ftype);
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype);
int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype);
int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile);
int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile);
......@@ -205,7 +208,7 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
// SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
......
......@@ -782,7 +782,13 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx)
if (code) goto _err;
tBlockReset(pBlockN);
pBlockN->last = 1;
pBlockN->minKey = pBlockO->minKey;
pBlockN->maxKey = pBlockO->maxKey;
pBlockN->minVersion = pBlockO->minVersion;
pBlockN->maxVersion = pBlockO->maxVersion;
pBlockN->nRow = pBlockO->nRow;
pBlockN->last = pBlockO->last;
pBlockN->hasDup = pBlockO->hasDup;
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN, pCommitter->cmprAlg);
if (code) goto _err;
......@@ -964,7 +970,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if (code) goto _err;
// update file header
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL);
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
if (code) goto _err;
// upsert SDFileSet
......
......@@ -171,7 +171,71 @@ _err:
static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) {
int32_t code = 0;
// TODO
char fname[TSDB_FILENAME_LEN];
if (pFrom && pTo) {
// head
if (tsdbFileIsSame(pFrom, pTo, TSDB_HEAD_FILE)) {
ASSERT(0);
} else {
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname);
taosRemoveFile(fname);
}
// data
if (tsdbFileIsSame(pFrom, pTo, TSDB_DATA_FILE)) {
if (pFrom->fData.size > pTo->fData.size) {
code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_DATA_FILE);
if (code) goto _err;
}
} else {
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_DATA_FILE, fname);
taosRemoveFile(fname);
}
// last
if (tsdbFileIsSame(pFrom, pTo, TSDB_LAST_FILE)) {
if (pFrom->fLast.size > pTo->fLast.size) {
code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE);
if (code) goto _err;
}
} else {
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_LAST_FILE, fname);
taosRemoveFile(fname);
}
// sma
if (tsdbFileIsSame(pFrom, pTo, TSDB_SMA_FILE)) {
if (pFrom->fSma.size > pTo->fSma.size) {
code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_SMA_FILE);
if (code) goto _err;
}
} else {
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_SMA_FILE, fname);
taosRemoveFile(fname);
}
} else if (pFrom) {
// head
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname);
taosRemoveFile(fname);
// data
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_DATA_FILE, fname);
taosRemoveFile(fname);
// last
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_LAST_FILE, fname);
taosRemoveFile(fname);
// fsm
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_SMA_FILE, fname);
taosRemoveFile(fname);
}
return code;
_err:
tsdbError("vgId:%d tsdb apply disk file set change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -120,6 +120,107 @@ void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char
}
}
bool tsdbFileIsSame(SDFileSet *pDFileSet1, SDFileSet *pDFileSet2, EDataFileT ftype) {
if (pDFileSet1->diskId.level != pDFileSet2->diskId.level || pDFileSet1->diskId.id != pDFileSet2->diskId.id) {
return false;
}
switch (ftype) {
case TSDB_HEAD_FILE:
return pDFileSet1->fHead.commitID == pDFileSet2->fHead.commitID;
case TSDB_DATA_FILE:
return pDFileSet1->fData.commitID == pDFileSet2->fData.commitID;
case TSDB_LAST_FILE:
return pDFileSet1->fLast.commitID == pDFileSet2->fLast.commitID;
case TSDB_SMA_FILE:
return pDFileSet1->fSma.commitID == pDFileSet2->fSma.commitID;
default:
ASSERT(0);
break;
}
}
int32_t tsdbUpdateDFileHdr(TdFilePtr pFD, SDFileSet *pSet, EDataFileT ftype) {
int32_t code = 0;
int64_t n;
char hdr[TSDB_FHDR_SIZE];
memset(hdr, 0, TSDB_FHDR_SIZE);
tPutDataFileHdr(hdr, pSet, ftype);
taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE);
n = taosLSeekFile(pFD, 0, SEEK_SET);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
n = taosWriteFile(pFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
_exit:
return code;
}
int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) {
int32_t code = 0;
int64_t size;
TdFilePtr pFD;
char fname[TSDB_FILENAME_LEN];
tsdbDataFileName(pTsdb, pSet, ftype, fname);
// open
pFD = taosOpenFile(fname, TD_FILE_WRITE);
if (pFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// truncate
switch (ftype) {
case TSDB_HEAD_FILE:
size = pSet->fHead.size;
break;
case TSDB_DATA_FILE:
size = pSet->fData.size;
break;
case TSDB_LAST_FILE:
size = pSet->fLast.size;
break;
case TSDB_SMA_FILE:
size = pSet->fSma.size;
break;
default:
ASSERT(0);
}
if (taosFtruncateFile(pFD, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// update header
code = tsdbUpdateDFileHdr(pFD, pSet, ftype);
if (code) goto _err;
// sync
if (taosFsyncFile(pFD) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// close
taosCloseFile(&pFD);
return code;
_err:
return code;
}
int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype) {
int32_t n = 0;
......
......@@ -1047,99 +1047,35 @@ _err:
return code;
}
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) {
int32_t code = 0;
int64_t size = TSDB_FHDR_SIZE;
int64_t n;
uint8_t *pBuf = NULL;
uint8_t hdr[TSDB_FHDR_SIZE];
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
SDataFile *pDataFile = &pWriter->wSet.fData;
SLastFile *pLastFile = &pWriter->wSet.fLast;
SSmaFile *pSmaFile = &pWriter->wSet.fSma;
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tsdbRealloc(ppBuf, size);
if (code) goto _err;
// head ==============
// build
memset(*ppBuf, 0, size);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_HEAD_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_HEAD_FILE);
if (code) goto _err;
// data ==============
memset(*ppBuf, 0, size);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_DATA_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pDataFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_DATA_FILE);
if (code) goto _err;
// last ==============
memset(*ppBuf, 0, size);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_LAST_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pLastFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_LAST_FILE);
if (code) goto _err;
// sma ==============
memset(*ppBuf, 0, size);
tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_SMA_FILE);
taosCalcChecksumAppend(0, *ppBuf, size);
// seek
if (taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// write
n = taosWriteFile(pWriter->pSmaFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_SMA_FILE);
if (code) goto _err;
tsdbFree(pBuf);
return code;
_err:
tsdbFree(pBuf);
tsdbError("vgId:%d update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -355,6 +355,7 @@ void tBlockReset(SBlock *pBlock) {
pBlock->aSubBlock[iSubBlock].nRow = 0;
pBlock->aSubBlock[iSubBlock].cmprAlg = -1;
pBlock->aSubBlock[iSubBlock].offset = -1;
pBlock->aSubBlock[iSubBlock].vsize = -1;
pBlock->aSubBlock[iSubBlock].ksize = -1;
pBlock->aSubBlock[iSubBlock].bsize = -1;
tMapDataReset(&pBlock->aSubBlock->mBlockCol);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册