提交 b25e55cf 编写于 作者: H Hongze Cheng

more work

上级 62f9403e
...@@ -48,6 +48,7 @@ typedef struct SOffset SOffset; ...@@ -48,6 +48,7 @@ typedef struct SOffset SOffset;
typedef struct SMapData SMapData; typedef struct SMapData SMapData;
typedef struct SColData SColData; typedef struct SColData SColData;
typedef struct SColDataBlock SColDataBlock; typedef struct SColDataBlock SColDataBlock;
typedef struct SBlockSMA SBlockSMA;
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
...@@ -86,9 +87,24 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback); ...@@ -86,9 +87,24 @@ int32_t tsdbFSEnd(STsdbFS *pFS, int8_t rollback);
// SDataFWriter // SDataFWriter
typedef struct SDataFWriter SDataFWriter; typedef struct SDataFWriter SDataFWriter;
int32_t tsdbDataFWriterOpen(SDataFWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFWriterClose(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, int64_t *rOffset, int64_t *rSize);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SColDataBlock *pBlockData, uint8_t **ppBuf, int64_t *rOffset,
int64_t *rSize);
int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize);
// SDataFReader // SDataFReader
typedef struct SDataFReader SDataFReader; typedef struct SDataFReader SDataFReader;
int32_t tsdbDataFReaderOpen(SDataFReader *pReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader *pReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadBlockData(SDataFReader *pReader, SBlock *pBlock, SColDataBlock *pBlockData, uint8_t **ppBuf);
int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA);
// SDelFWriter // SDelFWriter
typedef struct SDelFWriter SDelFWriter; typedef struct SDelFWriter SDelFWriter;
...@@ -125,29 +141,9 @@ typedef struct SAggrBlkCol SAggrBlkCol; ...@@ -125,29 +141,9 @@ typedef struct SAggrBlkCol SAggrBlkCol;
typedef struct SBlockData SBlockData; typedef struct SBlockData SBlockData;
typedef struct SReadH SReadH; typedef struct SReadH SReadH;
typedef struct SDFileSetReader SDFileSetReader;
typedef struct SDFileSetWriter SDFileSetWriter;
// SDFileSetWriter
// int32_t tsdbDFileSetWriterOpen(SDFileSetWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet);
// int32_t tsdbDFileSetWriterClose(SDFileSetWriter *pWriter, int8_t sync);
// int32_t tsdbWriteBlockData(SDFileSetWriter *pWriter, SDataCols *pDataCols, SBlock *pBlock);
// int32_t tsdbWriteSBlockInfo(SDFileSetWriter *pWriter, SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx);
// int32_t tsdbWriteSBlockIdx(SDFileSetWriter *pWriter, SBlockIdx *pBlockIdx);
// SDFileSetReader
// int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet);
// int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader);
// int32_t tsdbLoadSBlockIdx(SDFileSetReader *pReader, SArray *pArray);
// int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo);
// int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis);
// SDelFWriter
// SDelFReader
// tsdbUtil.c ============================================================================================== // tsdbUtil.c ==============================================================================================
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey);
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size); int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size);
void tsdbFree(uint8_t *pBuf); void tsdbFree(uint8_t *pBuf);
...@@ -284,7 +280,7 @@ struct TSDBROW { ...@@ -284,7 +280,7 @@ struct TSDBROW {
}; };
}; };
struct SBlockIdxItem { struct SBlockIdx {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
TSDBKEY minKey; TSDBKEY minKey;
...@@ -295,15 +291,6 @@ struct SBlockIdxItem { ...@@ -295,15 +291,6 @@ struct SBlockIdxItem {
int64_t size; int64_t size;
}; };
struct SBlockIdx {
int64_t suid;
int64_t uid;
uint32_t delimiter;
SOffset offset;
uint32_t nData;
uint8_t *pData;
};
typedef enum { typedef enum {
TSDB_SBLK_VER_0 = 0, TSDB_SBLK_VER_0 = 0,
TSDB_SBLK_VER_MAX, TSDB_SBLK_VER_MAX,
...@@ -365,26 +352,6 @@ struct SBlockData { ...@@ -365,26 +352,6 @@ struct SBlockData {
typedef void SAggrBlkData; // SBlockCol cols[]; typedef void SAggrBlkData; // SBlockCol cols[];
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t minutes, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1);
} else {
return (int)((key / tsTickPerMin[precision] / minutes));
}
}
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
// ================== TSDB global config // ================== TSDB global config
extern bool tsdbForceKeepFile; extern bool tsdbForceKeepFile;
...@@ -457,6 +424,7 @@ struct SMapData { ...@@ -457,6 +424,7 @@ struct SMapData {
uint8_t *pOfst; uint8_t *pOfst;
uint32_t nData; uint32_t nData;
uint8_t *pData; uint8_t *pData;
uint8_t *pBuf;
}; };
struct SColData { struct SColData {
...@@ -474,6 +442,21 @@ struct SColDataBlock { ...@@ -474,6 +442,21 @@ struct SColDataBlock {
SColData *aColData; SColData *aColData;
}; };
typedef struct {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SColSMA;
struct SBlockSMA {
int32_t nCol;
SColSMA *aColSMA;
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -15,9 +15,7 @@ ...@@ -15,9 +15,7 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct SCommitter SCommitter; typedef struct {
struct SCommitter {
STsdb *pTsdb; STsdb *pTsdb;
uint8_t *pBuf1; uint8_t *pBuf1;
uint8_t *pBuf2; uint8_t *pBuf2;
...@@ -29,15 +27,15 @@ struct SCommitter { ...@@ -29,15 +27,15 @@ struct SCommitter {
int8_t precision; int8_t precision;
int32_t minRow; int32_t minRow;
int32_t maxRow; int32_t maxRow;
TSKEY nextCommitKey;
// commit file data // commit file data
TSKEY nextKey;
int32_t commitFid; int32_t commitFid;
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
SDFileSetReader *pReader; SDFileSetReader *pReader;
SMapData oBlockIdx; // SMapData<SBlockIdx>, read from reader
SDFileSetWriter *pWriter; SDFileSetWriter *pWriter;
SMapData oBlockIdx; SMapData nBlockIdx; // SMapData<SBlockIdx>, build by committer
SMapData nBlockIdx;
// commit table data // commit table data
STbDataIter iter; STbDataIter iter;
STbDataIter *pIter; STbDataIter *pIter;
...@@ -57,7 +55,7 @@ struct SCommitter { ...@@ -57,7 +55,7 @@ struct SCommitter {
SDelData delDataNew; SDelData delDataNew;
SDelIdxItem delIdxItem; SDelIdxItem delIdxItem;
/* commit cache */ /* commit cache */
}; } SCommitter;
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter); static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter); static int32_t tsdbCommitData(SCommitter *pCommitter);
...@@ -81,8 +79,15 @@ _err: ...@@ -81,8 +79,15 @@ _err:
int32_t tsdbCommit(STsdb *pTsdb) { int32_t tsdbCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
SCommitter commith = {0}; SCommitter commith;
int fid; SMemTable *pMemTable = pTsdb->mem;
// check
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { // TODO
pTsdb->mem = NULL;
tsdbMemTableDestroy(pMemTable);
goto _exit;
}
// start commit // start commit
code = tsdbStartCommit(pTsdb, &commith); code = tsdbStartCommit(pTsdb, &commith);
...@@ -112,9 +117,11 @@ int32_t tsdbCommit(STsdb *pTsdb) { ...@@ -112,9 +117,11 @@ int32_t tsdbCommit(STsdb *pTsdb) {
goto _err; goto _err;
} }
_exit:
return code; return code;
_err: _err:
tsdbEndCommit(&commith, code);
tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d, failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
...@@ -122,6 +129,7 @@ _err: ...@@ -122,6 +129,7 @@ _err:
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
memset(pCommitter, 0, sizeof(*pCommitter));
ASSERT(pTsdb->mem && pTsdb->imem == NULL); ASSERT(pTsdb->mem && pTsdb->imem == NULL);
// lock(); // lock();
pTsdb->imem = pTsdb->mem; pTsdb->imem = pTsdb->mem;
...@@ -133,36 +141,24 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { ...@@ -133,36 +141,24 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
return code; return code;
} }
static int32_t tsdbCommitDataStart(SCommitter *pCommitter);
static int32_t tsdbCommitDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitDataEnd(SCommitter *pCommitter);
static int32_t tsdbCommitData(SCommitter *pCommitter) { static int32_t tsdbCommitData(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
// no data, just return // check
if (pMemTable->nRow == 0) { if (pMemTable->nRow == 0) {
goto _exit; goto _exit;
} }
// start // loop
code = tsdbCommitDataStart(pCommitter); pCommitter->nextKey = pMemTable->minKey.ts;
if (code) { while (pCommitter->nextKey < TSKEY_MAX) {
goto _err; pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
} tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey);
// commit code = tsdbCommitFileData(pCommitter);
code = tsdbCommitDataImpl(pCommitter); if (code) goto _err;
if (code) {
goto _err;
}
// end
code = tsdbCommitDataEnd(pCommitter);
if (code) {
goto _err;
} }
_exit: _exit:
...@@ -359,40 +355,6 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { ...@@ -359,40 +355,6 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
return code; return code;
} }
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
pCommitter->nextCommitKey = pMemTable->minKey.ts;
return code;
}
static int32_t tsdbCommitFileData(SCommitter *pCommitter);
static int32_t tsdbCommitDataImpl(SCommitter *pCommitter) {
int32_t code = 0;
while (pCommitter->nextCommitKey < TSKEY_MAX) {
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextCommitKey, pCommitter->minutes, pCommitter->precision);
code = tsdbCommitFileData(pCommitter);
if (code) goto _err;
}
_exit:
return code;
_err:
return code;
}
static int32_t tsdbCommitDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter); static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter);
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter); static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter);
...@@ -430,10 +392,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -430,10 +392,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
SDFileSet *pRSet = NULL; // TODO SDFileSet *pRSet = NULL; // TODO
SDFileSet *pWSet = NULL; // TODO SDFileSet *pWSet = NULL; // TODO
// memory
tMapDataReset(&pCommitter->oBlockIdx);
tMapDataReset(&pCommitter->nBlockIdx);
// load old // load old
pCommitter->oBlockIdx.nItem = 0;
pCommitter->oBlockIdx.flag = 0;
pCommitter->oBlockIdx.nData = 0;
if (pRSet) { if (pRSet) {
code = tsdbDFileSetReaderOpen(&pCommitter->pReader, pTsdb, pRSet); code = tsdbDFileSetReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
if (code) goto _err; if (code) goto _err;
...@@ -443,9 +406,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { ...@@ -443,9 +406,6 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
} }
// create new // create new
pCommitter->nBlockIdx.nItem = 0;
pCommitter->nBlockIdx.flag = 0;
pCommitter->nBlockIdx.nData = 0;
code = tsdbDFileSetWriterOpen(&pCommitter->pWriter, pTsdb, pWSet); code = tsdbDFileSetWriterOpen(&pCommitter->pWriter, pTsdb, pWSet);
if (code) goto _err; if (code) goto _err;
......
...@@ -140,6 +140,7 @@ void tMapDataReset(SMapData *pMapData) { ...@@ -140,6 +140,7 @@ void tMapDataReset(SMapData *pMapData) {
void tMapDataClear(SMapData *pMapData) { void tMapDataClear(SMapData *pMapData) {
tsdbFree(pMapData->pOfst); tsdbFree(pMapData->pOfst);
tsdbFree(pMapData->pData); tsdbFree(pMapData->pData);
tsdbFree(pMapData->pBuf);
} }
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
...@@ -741,6 +742,23 @@ int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) { ...@@ -741,6 +742,23 @@ int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) {
} }
} }
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fid * minutes * tsTickPerMin[precision];
*maxKey = *minKey + minutes * tsTickPerMin[precision] - 1;
}
// int tsdFidLevel(int fid, TSKEY now, minute) {
// if (fid >= pRtn->maxFid) {
// return 0;
// } else if (fid >= pRtn->midFid) {
// return 1;
// } else if (fid >= pRtn->minFid) {
// return 2;
// } else {
// return -1;
// }
// }
// TSDBROW ====================================================== // TSDBROW ======================================================
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
// TODO // TODO
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册