提交 1753d121 编写于 作者: H Hongze Cheng

more work

上级 81897d32
......@@ -77,6 +77,9 @@ typedef struct STsdbFSState STsdbFSState;
#define VERSION_MIN 0
#define VERSION_MAX INT64_MAX
#define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN})
#define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX})
// tsdbUtil.c ==============================================================================================
// TSDBROW
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
......@@ -110,14 +113,15 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo);
int32_t tPutBlockCol(uint8_t *p, void *ph);
int32_t tGetBlockCol(uint8_t *p, void *ph);
// SBlock
#define tBlockInit() ((SBlock){.info = tKEYINFOInit()})
#define tBlockInit() ((SBlock){0})
void tBlockReset(SBlock *pBlock);
void tBlockClear(SBlock *pBlock);
int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
// SBlockIdx
#define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()})
// #define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()})
void tBlockIdxReset(SBlockIdx *pBlockIdx);
int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph);
// SColdata
......@@ -328,7 +332,10 @@ struct TSDBROW {
struct SBlockIdx {
int64_t suid;
int64_t uid;
KEYINFO info;
TSKEY minKey;
TSKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t offset;
int64_t size;
};
......@@ -358,7 +365,10 @@ typedef struct {
} SSubBlock;
struct SBlock {
KEYINFO info;
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int32_t nRow;
int8_t last;
int8_t hasDup;
......
......@@ -607,12 +607,12 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) {
// SDataFWriter ====================================================
struct SDataFWriter {
STsdb *pTsdb;
SDFileSet *pSet;
TdFilePtr pHeadFD;
TdFilePtr pDataFD;
TdFilePtr pLastFD;
TdFilePtr pSmaFD;
STsdb *pTsdb;
SDFileSet wSet;
TdFilePtr pHeadFD;
TdFilePtr pDataFD;
TdFilePtr pLastFD;
TdFilePtr pSmaFD;
};
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) {
......@@ -630,9 +630,8 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS
goto _err;
}
pWriter->pTsdb = pTsdb;
pWriter->pSet = pSet;
// create the directory if not there
pWriter->wSet = *pSet;
pSet = &pWriter->wSet;
// head
flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
......@@ -809,10 +808,10 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
int64_t size = TSDB_FHDR_SIZE;
int64_t n;
uint8_t *pBuf = NULL;
SHeadFile *pHeadFile = &pWriter->pSet->fHead;
SDataFile *pDataFile = &pWriter->pSet->fData;
SLastFile *pLastFile = &pWriter->pSet->fLast;
SSmaFile *pSmaFile = &pWriter->pSet->fSma;
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
SDataFile *pDataFile = &pWriter->wSet.fData;
SLastFile *pLastFile = &pWriter->wSet.fLast;
SSmaFile *pSmaFile = &pWriter->wSet.fSma;
// alloc
if (!ppBuf) ppBuf = &pBuf;
......@@ -904,7 +903,7 @@ _err:
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = 0;
SHeadFile *pHeadFile = &pWriter->pSet->fHead;
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
int64_t n = 0;
uint8_t *pBuf = NULL;
......@@ -946,7 +945,7 @@ _err:
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->pSet->fHead;
SHeadFile *pHeadFile = &pWriter->wSet.fHead;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
......
......@@ -277,13 +277,25 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) {
}
// SBlockIdx ======================================================
void tBlockIdxReset(SBlockIdx *pBlockIdx) {
pBlockIdx->minKey = TSKEY_MAX;
pBlockIdx->maxKey = TSKEY_MIN;
pBlockIdx->minVersion = VERSION_MAX;
pBlockIdx->maxVersion = VERSION_MIN;
pBlockIdx->offset = -1;
pBlockIdx->size = -1;
}
int32_t tPutBlockIdx(uint8_t *p, void *ph) {
int32_t n = 0;
SBlockIdx *pBlockIdx = (SBlockIdx *)ph;
n += tPutI64(p ? p + n : p, pBlockIdx->suid);
n += tPutI64(p ? p + n : p, pBlockIdx->uid);
n += tPutKEYINFO(p ? p + n : p, &pBlockIdx->info);
n += tPutI64(p ? p + n : p, pBlockIdx->minKey);
n += tPutI64(p ? p + n : p, pBlockIdx->maxKey);
n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion);
n += tPutI64v(p ? p + n : p, pBlockIdx->offset);
n += tPutI64v(p ? p + n : p, pBlockIdx->size);
......@@ -296,7 +308,10 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
n += tGetI64(p + n, &pBlockIdx->suid);
n += tGetI64(p + n, &pBlockIdx->uid);
n += tGetKEYINFO(p + n, &pBlockIdx->info);
n += tGetI64(p + n, &pBlockIdx->minKey);
n += tGetI64(p + n, &pBlockIdx->maxKey);
n += tGetI64v(p + n, &pBlockIdx->minVersion);
n += tGetI64v(p + n, &pBlockIdx->maxVersion);
n += tGetI64v(p + n, &pBlockIdx->offset);
n += tGetI64v(p + n, &pBlockIdx->size);
......@@ -305,7 +320,10 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) {
// SBlock ======================================================
void tBlockReset(SBlock *pBlock) {
pBlock->info = tKEYINFOInit();
pBlock->minKey = TSDBKEY_MAX;
pBlock->maxKey = TSDBKEY_MIN;
pBlock->minVersion = VERSION_MAX;
pBlock->maxVersion = VERSION_MIN;
pBlock->nRow = 0;
pBlock->last = -1;
pBlock->cmprAlg = -1;
......@@ -328,7 +346,10 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
int32_t n = 0;
SBlock *pBlock = (SBlock *)ph;
n += tPutKEYINFO(p ? p + n : p, &pBlock->info);
n += tPutTSDBKEY(p ? p + n : p, &pBlock->minKey);
n += tPutTSDBKEY(p ? p + n : p, &pBlock->maxKey);
n += tPutI64v(p ? p + n : p, pBlock->minVersion);
n += tPutI64v(p ? p + n : p, pBlock->maxVersion);
n += tPutI32v(p ? p + n : p, pBlock->nRow);
n += tPutI8(p ? p + n : p, pBlock->last);
n += tPutI8(p ? p + n : p, pBlock->hasDup);
......@@ -348,7 +369,10 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
int32_t n = 0;
SBlock *pBlock = (SBlock *)ph;
n += tGetKEYINFO(p + n, &pBlock->info);
n += tGetTSDBKEY(p + n, &pBlock->minKey);
n += tGetTSDBKEY(p + n, &pBlock->maxKey);
n += tGetI64v(p + n, &pBlock->minVersion);
n += tGetI64v(p + n, &pBlock->maxVersion);
n += tGetI32v(p + n, &pBlock->nRow);
n += tGetI8(p + n, &pBlock->last);
n += tGetI8(p + n, &pBlock->hasDup);
......@@ -369,9 +393,9 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
SBlock *pBlock1 = (SBlock *)p1;
SBlock *pBlock2 = (SBlock *)p2;
if (tsdbKeyCmprFn(&pBlock1->info.maxKey, &pBlock2->info.minKey) < 0) {
if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) {
return -1;
} else if (tsdbKeyCmprFn(&pBlock1->info.minKey, &pBlock2->info.maxKey) > 0) {
} else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) {
return 1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册