提交 81897d32 编写于 作者: H Hongze Cheng

more work

上级 4b33695f
......@@ -65,6 +65,7 @@ typedef struct SDelFReader SDelFReader;
typedef struct SRowIter SRowIter;
typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
typedef struct STsdbFSState STsdbFSState;
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
......@@ -146,6 +147,8 @@ void tMapDataReset(SMapData *pMapData);
void tMapDataClear(SMapData *pMapData);
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *));
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem);
int32_t tPutMapData(uint8_t *p, SMapData *pMapData);
int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
// other
......@@ -166,8 +169,8 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, S
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
// tsdbFile.c ==============================================================================================
enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE };
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, int8_t ftype, char fname[]);
typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT;
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]);
// SDelFile
#define tsdbDelFileCreate() \
((SDelFile){ \
......@@ -179,6 +182,11 @@ int32_t tsdbFSClose(STsdbFS *pFS);
int32_t tsdbFSBegin(STsdbFS *pFS);
int32_t tsdbFSCommit(STsdbFS *pFS);
int32_t tsdbFSRollback(STsdbFS *pFS);
int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile);
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet);
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState);
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
// tsdbReaderWriter.c ==============================================================================================
// SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
......@@ -499,6 +507,14 @@ struct SRowMerger {
SArray *pArray; // SArray<SColVal>
};
struct STsdbFS {
STsdb *pTsdb;
TdThreadRwlock lock;
int8_t inTxn;
STsdbFSState *cState;
STsdbFSState *nState;
};
#ifdef __cplusplus
}
#endif
......
......@@ -649,8 +649,8 @@ _err:
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SDFileSet *pRSet = NULL; // TODO
SDFileSet *pWSet = NULL; // TODO
SDFileSet *pRSet = NULL;
SDFileSet *pWSet = NULL;
// memory
pCommitter->nextKey = TSKEY_MAX;
......@@ -660,6 +660,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
tMapDataReset(&pCommitter->oBlockMap);
tBlockReset(&pCommitter->oBlock);
tBlockDataReset(&pCommitter->oBlockData);
pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid);
if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err;
......@@ -673,6 +674,22 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
tMapDataReset(&pCommitter->nBlockMap);
tBlockReset(&pCommitter->nBlock);
tBlockDataReset(&pCommitter->nBlockData);
if (pRSet) {
pWSet = &(SDFileSet){.diskId = pRSet->diskId,
.fid = pCommitter->commitFid,
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
.fData = pRSet->fData,
.fLast = {.commitID = pCommitter->commitID, .size = 0},
.fSma = pRSet->fSma};
} else {
SDiskID did = {.level = 0, .id = 0}; // TODO: alloc a new one
pWSet = &(SDFileSet){.diskId = did,
.fid = pCommitter->commitFid,
.fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0},
.fData = {.commitID = pCommitter->commitID, .size = 0},
.fLast = {.commitID = pCommitter->commitID, .size = 0},
.fSma = {.commitID = pCommitter->commitID, .size = 0}};
}
code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
if (code) goto _err;
......
......@@ -15,18 +15,10 @@
#include "tsdb.h"
typedef struct {
struct STsdbFSState {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<aDFileSet>
SDelFile delFile;
} STsdbFSState;
struct STsdbFS {
STsdb *pTsdb;
TdThreadRwlock lock;
int8_t inTxn;
STsdbFSState *cState;
STsdbFSState *nState;
};
// =================================================================================================
......@@ -373,8 +365,8 @@ static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSS
// SDFileSet
while (iFrom < nFrom && iTo < nTo) {
pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom);
pDFileSetTo = (SDFileSet *)taosArrayGetP(pTo->aDFileSet, iTo);
pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom);
pDFileSetTo = (SDFileSet *)taosArrayGet(pTo->aDFileSet, iTo);
if (pDFileSetFrom->fid == pDFileSetTo->fid) {
code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, pDFileSetTo);
......@@ -393,7 +385,7 @@ static int32_t tsdbFSApplyDiskChange(STsdbFS *pFS, STsdbFSState *pFrom, STsdbFSS
}
while (iFrom < nFrom) {
pDFileSetFrom = (SDFileSet *)taosArrayGetP(pFrom->aDFileSet, iFrom);
pDFileSetFrom = (SDFileSet *)taosArrayGet(pFrom->aDFileSet, iFrom);
code = tsdbApplyDFileSetChange(pFS, pDFileSetFrom, NULL);
if (code) goto _err;
......@@ -613,6 +605,16 @@ _err:
return code;
}
static int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
if (((SDFileSet *)p1)->fid < ((SDFileSet *)p2)->fid) {
return -1;
} else if (((SDFileSet *)p1)->fid > ((SDFileSet *)p2)->fid) {
return 1;
}
return 0;
}
// EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
int32_t code = 0;
......@@ -654,12 +656,17 @@ int32_t tsdbFSBegin(STsdbFS *pFS) {
ASSERT(!pFS->inTxn);
pFS->inTxn = 1;
// SDelFile
pFS->nState->pDelFile = NULL;
if (pFS->cState->pDelFile) {
pFS->nState->delFile = pFS->cState->delFile;
pFS->nState->pDelFile = &pFS->nState->delFile;
}
pFS->nState->pDelFile = pFS->cState->pDelFile;
// SArray<aDFileSet>
taosArrayClear(pFS->nState->aDFileSet);
for (int32_t iDFileSet = 0; iDFileSet < taosArrayGetSize(pFS->cState->aDFileSet); iDFileSet++) {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGetP(pFS->cState->aDFileSet, iDFileSet);
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->cState->aDFileSet); iSet++) {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->cState->aDFileSet, iSet);
if (taosArrayPush(pFS->nState->aDFileSet, &pDFileSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -667,6 +674,7 @@ int32_t tsdbFSBegin(STsdbFS *pFS) {
}
}
pFS->inTxn = 1;
return code;
_err:
......@@ -713,3 +721,42 @@ _err:
tsdbError("vgId:%d tsdb fs rollback failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSStateUpsertDelFile(STsdbFSState *pState, SDelFile *pDelFile) {
int32_t code = 0;
pState->delFile = *pDelFile;
pState->pDelFile = &pState->delFile;
return code;
}
int32_t tsdbFSStateUpsertDFileSet(STsdbFSState *pState, SDFileSet *pSet) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pState->aDFileSet, pSet, tDFileSetCmprFn, TD_GE);
if (idx < 0) {
if (taosArrayPush(pState->aDFileSet, pSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
} else {
SDFileSet *tDFileSet = (SDFileSet *)taosArrayGet(pState->aDFileSet, idx);
int32_t c = tDFileSetCmprFn(pSet, tDFileSet);
if (c == 0) {
taosArraySet(pState->aDFileSet, idx, pSet);
} else {
if (taosArrayInsert(pState->aDFileSet, idx, pSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
}
_exit:
return code;
}
SDelFile *tsdbFSStateGetDelFile(STsdbFSState *pState) { return pState->pDelFile; }
SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid) {
return (SDFileSet *)taosArraySearch(pState->aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ);
}
\ No newline at end of file
......@@ -15,7 +15,7 @@
#include "tsdb.h"
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, int8_t ftype, char fname[]) {
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]) {
STfs *pTfs = pTsdb->pVnode->pTfs;
switch (ftype) {
......
......@@ -405,8 +405,58 @@ struct SDataFReader {
};
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet) {
int32_t code = 0;
// TODO
int32_t code = 0;
SDataFReader *pReader;
char fname[TSDB_FILENAME_LEN];
// alloc
pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pTsdb = pTsdb;
pReader->pSet = pSet;
// open impl
// head
tsdbDataFileName(pTsdb, pSet, TSDB_HEAD_FILE, fname);
pReader->pHeadFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pHeadFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// data
tsdbDataFileName(pTsdb, pSet, TSDB_DATA_FILE, fname);
pReader->pDataFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pDataFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// last
tsdbDataFileName(pTsdb, pSet, TSDB_LAST_FILE, fname);
pReader->pLastFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// sma
tsdbDataFileName(pTsdb, pSet, TSDB_SMA_FILE, fname);
pReader->pSmaFD = taosOpenFile(fname, TD_FILE_READ);
if (pReader->pSmaFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
*ppReader = pReader;
return code;
_err:
tsdbError("vgId:%d tsdb data file reader open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
......@@ -566,8 +616,139 @@ struct SDataFWriter {
};
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
int32_t code = 0;
// TODO
int32_t code = 0;
int32_t flag;
int64_t n;
SDataFWriter *pWriter = NULL;
char fname[TSDB_FILENAME_LEN];
char hdr[TSDB_FHDR_SIZE] = {0};
// alloc
pWriter = taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTsdb = pTsdb;
pWriter->pSet = pSet;
// create the directory if not there
// head
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
tsdbDataFileName(pTsdb, pSet, TSDB_HEAD_FILE, fname);
pWriter->pHeadFD = taosOpenFile(fname, flag);
if (pWriter->pHeadFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosWriteFile(pWriter->pHeadFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == TSDB_FHDR_SIZE);
pSet->fHead.size += TSDB_FHDR_SIZE;
// data
if (pSet->fData.size == 0) {
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
} else {
flag = TD_FILE_WRITE;
}
tsdbDataFileName(pTsdb, pSet, TSDB_DATA_FILE, fname);
pWriter->pDataFD = taosOpenFile(fname, flag);
if (pWriter->pDataFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (pSet->fData.size == 0) {
n = taosWriteFile(pWriter->pDataFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSet->fData.size += TSDB_FHDR_SIZE;
} else {
n = taosLSeekFile(pWriter->pDataFD, 0, SEEK_END);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == pSet->fData.size);
}
// last
if (pSet->fLast.size == 0) {
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
} else {
flag = TD_FILE_WRITE;
}
tsdbDataFileName(pTsdb, pSet, TSDB_LAST_FILE, fname);
pWriter->pLastFD = taosOpenFile(fname, flag);
if (pWriter->pLastFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (pSet->fLast.size == 0) {
n = taosWriteFile(pWriter->pLastFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSet->fLast.size += TSDB_FHDR_SIZE;
} else {
n = taosLSeekFile(pWriter->pLastFD, 0, SEEK_END);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == pSet->fLast.size);
}
// sma
if (pSet->fSma.size == 0) {
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
} else {
flag = TD_FILE_WRITE;
}
tsdbDataFileName(pTsdb, pSet, TSDB_SMA_FILE, fname);
pWriter->pSmaFD = taosOpenFile(fname, flag);
if (pWriter->pSmaFD == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (pSet->fSma.size == 0) {
n = taosWriteFile(pWriter->pSmaFD, hdr, TSDB_FHDR_SIZE);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pSet->fSma.size += TSDB_FHDR_SIZE;
} else {
n = taosLSeekFile(pWriter->pSmaFD, 0, SEEK_END);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(n == pSet->fSma.size);
}
*ppWriter = pWriter;
return code;
_err:
tsdbError("vgId:%d tsdb data file writer open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
......
......@@ -70,6 +70,34 @@ static int32_t tMapDataGetOffset(SMapData *pMapData, int32_t idx) {
ASSERT(0);
}
}
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) {
int32_t code = 0;
int32_t lidx = 0;
int32_t ridx = pMapData->nItem - 1;
int32_t midx;
int32_t c;
while (lidx <= ridx) {
midx = (lidx + ridx) / 2;
tMapDataGetItemByIdx(pMapData, midx, pItem, tGetItemFn);
c = tItemCmprFn(pSearchItem, pItem);
if (c == 0) {
goto _exit;
} else if (c < 0) {
ridx = midx - 1;
} else {
lidx = midx + 1;
}
}
code = TSDB_CODE_NOT_FOUND;
_exit:
return code;
}
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) {
int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册