提交 1160f530 编写于 作者: H Hongze Cheng

more code

上级 5e9d263e
...@@ -370,12 +370,12 @@ struct STsdb { ...@@ -370,12 +370,12 @@ struct STsdb {
TdThreadRwlock rwLock; TdThreadRwlock rwLock;
SMemTable *mem; SMemTable *mem;
SMemTable *imem; SMemTable *imem;
STsdbFS fs; STsdbFS fs; // old
SLRUCache *lruCache; SLRUCache *lruCache;
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache *biCache; SLRUCache *biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;
struct STFileSystem *pFS; struct STFileSystem *pFS; // new
SRocksCache rCache; SRocksCache rCache;
}; };
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
typedef TARRAY2(SSttBlk) TSttBlkArray; typedef TARRAY2(SSttBlk) TSttBlkArray;
typedef TARRAY2(SStatisBlk) TStatisBlkArray; typedef TARRAY2(SStatisBlk) TStatisBlkArray;
typedef TARRAY2(STombBlk) TTombBlkArray;
// SSttFileReader ========================================== // SSttFileReader ==========================================
typedef struct SSttFileReader SSttFileReader; typedef struct SSttFileReader SSttFileReader;
......
...@@ -123,13 +123,130 @@ _exit: ...@@ -123,13 +123,130 @@ _exit:
return code; return code;
} }
static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *aDelIdx, STFileSet *fset) {
int32_t code = 0;
int32_t lino = 0;
SArray *aDelData = NULL;
int64_t minKey, maxKey;
STombBlock tombBlock[1] = {0};
TTombBlkArray tombBlkArray[1] = {0};
STsdbFD *fd = NULL;
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) {
SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i);
code = tsdbReadDelData(reader, pDelIdx, aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) {
SDelData *pDelData = taosArrayGet(aDelData, j);
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
continue;
}
STombRecord record = {
.suid = pDelIdx->suid,
.uid = pDelIdx->uid,
.version = pDelData->version,
.skey = pDelData->sKey,
.ekey = pDelData->eKey,
};
code = tTombBlockPut(tombBlock, &record);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) {
if (fd == NULL) {
STFile file = {
.type = TSDB_FTYPE_TOMB,
.did = {0}, // TODO
.fid = fset->fid,
.cid = 0, // TODO
};
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize,
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd);
TSDB_CHECK_CODE(code, lino, _exit);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE);
TSDB_CHECK_CODE(code, lino, _exit);
fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr);
}
// TODO
tTombBlockClear(tombBlock);
}
}
}
if (TOMB_BLOCK_SIZE(tombBlock) > 0) {
// TODO
tTombBlockClear(tombBlock);
}
if (TARRAY2_SIZE(tombBlkArray) > 0) {
// TODO
}
if (fd) {
code = tsdbFsyncFile(fd);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbCloseFile(&fd);
}
TARRAY2_DESTROY(tombBlkArray, NULL);
tTombBlockDestroy(tombBlock);
taosArrayDestroy(aDelData);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) { static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// TODO SDelFReader *reader = NULL;
SArray *aDelIdx = NULL;
if ((aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDelFReaderOpen(&reader, pDelFile, tsdb);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadDelIdx(reader, aDelIdx);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(aDelIdx) > 0) {
STFileSet *fset;
TARRAY2_FOREACH(fileSetArray, fset) {
code = tsdbDumpTombDataToFSet(tsdb, reader, aDelIdx, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
tsdbDelFReaderClose(&reader);
taosArrayDestroy(aDelIdx);
ASSERT(0);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
...@@ -166,9 +283,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { ...@@ -166,9 +283,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
current_fname(tsdb, fname, TSDB_FCURRENT); current_fname(tsdb, fname, TSDB_FCURRENT);
code = save_fs(fileSetArray, NULL); code = save_fs(fileSetArray, fname);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear);
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
......
...@@ -58,6 +58,8 @@ typedef struct { ...@@ -58,6 +58,8 @@ typedef struct {
int8_t rsvd[7]; int8_t rsvd[7];
} STombBlk; } STombBlk;
typedef TARRAY2(STombBlk) TTombBlkArray;
#define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) #define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
int32_t tTombBlockInit(STombBlock *tombBlock); int32_t tTombBlockInit(STombBlock *tombBlock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册