提交 2a754394 编写于 作者: H Hongze Cheng

more code

上级 3f4b4b52
......@@ -354,21 +354,19 @@ typedef struct {
} SRocksCache;
struct STsdb {
char *path;
SVnode *pVnode;
STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock;
SMemTable *mem;
SMemTable *imem;
STsdbFS fs;
SLRUCache *lruCache;
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
#ifdef USE_DEV_CODE
char *path;
SVnode *pVnode;
STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock;
SMemTable *mem;
SMemTable *imem;
STsdbFS fs;
SLRUCache *lruCache;
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
struct STFileSystem *pFS;
#endif
SRocksCache rCache;
SRocksCache rCache;
};
struct TSDBKEY {
......
......@@ -35,9 +35,12 @@ typedef enum {
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback);
int32_t tsdbCloseFS(STFileSystem **ppFS);
// txn
int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype);
int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid);
int32_t tsdbFSEditBegin(STFileSystem *fs, int64_t eid, const SArray *aFileOp, EFEditT etype);
int32_t tsdbFSEditCommit(STFileSystem *pFS);
int32_t tsdbFSEditAbort(STFileSystem *pFS);
// other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **ppFSet);
/* Exposed Structs */
struct STFileSystem {
......
......@@ -40,6 +40,8 @@ int32_t tsdbFileSetCreate(int32_t fid, STFileSet **ppSet);
int32_t tsdbFileSetEdit(STFileSet *pSet, STFileOp *pOp);
int32_t tsdbEditFileSet(STFileSet *pFileSet, const STFileOp *pOp);
int32_t tsdbFSetCmprFn(const STFileSet *pSet1, const STFileSet *pSet2);
struct STFileOp {
tsdb_fop_t op;
int32_t fid;
......
......@@ -48,8 +48,8 @@ struct STFile {
tsdb_ftype_t type;
SDiskID did;
int32_t fid;
int64_t cid;
int32_t fid; // file id
int64_t cid; // commit id
int64_t size;
union {
struct {
......
......@@ -42,6 +42,14 @@ int32_t tsdbSttFSegReadStatisBlock(SSttFSegReader *pSegReader, const void *pBloc
int32_t tsdbSttFSegReadDelBlock(SSttFSegReader *pSegReader, const void *pBlock);
int32_t tsdbSttFSegReadSttBlock(SSttFSegReader *pSegReader, const void *pBlock);
struct SSttFileReaderConfig {
STsdb *pTsdb;
SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow;
uint8_t **aBuf;
// TODO
};
// SSttFWriter ==========================================
typedef struct SSttFileWriter SSttFileWriter;
typedef struct SSttFileWriterConfig SSttFileWriterConfig;
......@@ -51,7 +59,6 @@ int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFi
int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow);
int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData);
/* ------------------------------------------------- */
struct SSttFileWriterConfig {
STsdb *pTsdb;
STFile file;
......@@ -63,14 +70,6 @@ struct SSttFileWriterConfig {
uint8_t **aBuf;
};
struct SSttFileReaderConfig {
STsdb *pTsdb;
SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow;
uint8_t **aBuf;
// TODO
};
#ifdef __cplusplus
}
#endif
......
......@@ -29,14 +29,15 @@ typedef struct {
SArray *aTbDataP; // SArray<STbData *>
SArray *aFileOp; // SArray<STFileOp>
int64_t eid; // edit id
// context
TSKEY nextKey;
int32_t fid;
int32_t expLevel;
TSKEY minKey;
TSKEY maxKey;
STFileSet *pFileSet;
TSKEY nextKey;
int32_t fid;
int32_t expLevel;
TSKEY minKey;
TSKEY maxKey;
const STFileSet *pFileSet;
// writer
SSttFileWriter *pWriter;
......@@ -44,14 +45,14 @@ typedef struct {
static int32_t open_committer_writer(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino;
STsdb *pTsdb = pCommitter->pTsdb;
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb;
int32_t vid = TD_VID(pTsdb->pVnode);
SSttFileWriterConfig conf = {
SSttFileWriterConfig config = {
.pTsdb = pCommitter->pTsdb,
.maxRow = pCommitter->maxRow,
.szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize,
.szPage = pTsdb->pVnode->config.tsdbPageSize,
.cmprAlg = pCommitter->cmprAlg,
.pSkmTb = NULL,
.pSkmRow = NULL,
......@@ -59,41 +60,39 @@ static int32_t open_committer_writer(SCommitter *pCommitter) {
};
if (pCommitter->pFileSet) {
ASSERTS(0, "TODO: Not implemented yet");
// TODO
ASSERT(0);
} else {
conf.file.type = TSDB_FTYPE_STT;
config.file.type = TSDB_FTYPE_STT;
if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &conf.file.did) < 0) {
if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &config.file.did) < 0) {
code = TSDB_CODE_FS_NO_VALID_DISK;
TSDB_CHECK_CODE(code, lino, _exit);
}
conf.file.size = 0;
conf.file.cid = 1;
conf.file.fid = pCommitter->fid;
config.file.fid = pCommitter->fid;
config.file.cid = pCommitter->eid;
config.file.size = 0;
config.file.stt.lvl = 0;
config.file.stt.nseg = 0;
tsdbTFileInit(pTsdb, &conf.file);
tsdbTFileInit(pTsdb, &config.file);
}
code = tsdbSttFWriterOpen(&conf, &pCommitter->pWriter);
code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError( //
"vgId:%d %s failed at line %d since %s, fid:%d", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code), //
pCommitter->fid);
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", vid, __func__, lino, tstrerror(code), pCommitter->fid);
}
return code;
}
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) {
int32_t code = 0;
int32_t lino;
int32_t lino = 0;
int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
if (pCommitter->pWriter == NULL) {
code = open_committer_writer(pCommitter);
......@@ -105,20 +104,10 @@ static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDB
_exit:
if (code) {
tsdbError( //
"vgId:%d failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
lino, //
tstrerror(code));
tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
} else {
tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid, //
tbid->suid, //
tbid->uid, //
TSDBROW_KEY(pRow).ts, //
TSDBROW_KEY(pRow).version);
tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64, vid, __func__,
pCommitter->fid, tbid->suid, tbid->uid, TSDBROW_KEY(pRow).ts, TSDBROW_KEY(pRow).version);
}
return 0;
}
......@@ -131,15 +120,14 @@ static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int6
}
static int32_t commit_timeseries_data(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino;
int32_t code = 0;
int32_t lino = 0;
int64_t nRow = 0;
SMemTable *pMem = pCommitter->pTsdb->imem;
STsdb *pTsdb = pCommitter->pTsdb;
int32_t vid = TD_VID(pTsdb->pVnode);
SMemTable *pMem = pTsdb->imem;
if (pMem->nRow == 0) { // no time-series data to commit
goto _exit;
}
if (pMem->nRow == 0) goto _exit;
TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
......@@ -165,19 +153,9 @@ static int32_t commit_timeseries_data(SCommitter *pCommitter) {
_exit:
if (code) {
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbDebug( //
"vgId:%d %s done, fid:%d nRow:%" PRId64, //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid, //
nRow);
tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, vid, __func__, pCommitter->fid, nRow);
}
return code;
}
......@@ -186,6 +164,8 @@ static int32_t commit_delete_data(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino;
return 0;
ASSERTS(0, "TODO: Not implemented yet");
int64_t nDel = 0;
......@@ -221,29 +201,27 @@ _exit:
return code;
}
static int32_t start_commit_file_set(SCommitter *pCommitter) {
static int32_t commit_fset_start(SCommitter *pCommitter) {
STsdb *pTsdb = pCommitter->pTsdb;
int32_t vid = TD_VID(pTsdb->pVnode);
pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey);
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pTsdb->keepCfg, taosGetTimestampSec());
pCommitter->nextKey = TSKEY_MAX;
pCommitter->pFileSet = NULL; // TODO: need to search the file system
tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->pFileSet);
tsdbDebug( //
"vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid, //
pCommitter->minKey, //
pCommitter->maxKey, //
pCommitter->expLevel);
tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, pCommitter->fid,
pCommitter->minKey, pCommitter->maxKey, pCommitter->expLevel);
return 0;
}
static int32_t end_commit_file_set(SCommitter *pCommitter) {
static int32_t commit_fset_end(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino;
int32_t lino = 0;
int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
if (pCommitter->pWriter == NULL) return 0;
......@@ -253,60 +231,43 @@ static int32_t end_commit_file_set(SCommitter *pCommitter) {
TSDB_CHECK_CODE(code, lino, _exit);
}
TSDB_CHECK_CODE( //
code = tsdbSttFWriterClose( //
&pCommitter->pWriter, //
0, //
pFileOp), //
lino, //
_exit);
code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError( //
"vgId:%d failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
lino, //
tstrerror(code));
tsdbError("vgId:%d failed at line %d since %s", vid, lino, tstrerror(code));
} else {
tsdbDebug( //
"vgId:%d %s done, fid:%d", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
pCommitter->fid);
tsdbDebug("vgId:%d %s done, fid:%d", vid, __func__, pCommitter->fid);
}
return code;
}
static int32_t commit_next_file_set(SCommitter *pCommitter) {
static int32_t commit_fset(SCommitter *pCommitter) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(pCommitter->pTsdb->pVnode);
// fset commit start
code = start_commit_file_set(pCommitter);
code = commit_fset_start(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
// commit fset
code = commit_timeseries_data(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
/* TODO
code = commit_delete_data(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
*/
// fset commit end
code = end_commit_file_set(pCommitter);
code = commit_fset_end(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pCommitter->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done", vid, __func__);
}
return code;
}
......@@ -332,6 +293,7 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
taosArrayDestroy(pCommitter->aFileOp);
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
tsdbFSAllocEid(pTsdb->pFS, &pCommitter->eid);
// start loop
pCommitter->nextKey = pTsdb->imem->minKey; // TODO
......@@ -347,35 +309,27 @@ _exit:
static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
int32_t code = 0;
int32_t lino;
int32_t lino = 0;
int32_t vid = TD_VID(pCommiter->pTsdb->pVnode);
if (eno == 0) {
TSDB_CHECK_CODE( //
code = tsdbFSEditBegin( //
pCommiter->pTsdb->pFS, //
pCommiter->aFileOp, //
TSDB_FEDIT_COMMIT),
lino, //
_exit);
code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, pCommiter->eid, pCommiter->aFileOp, TSDB_FEDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERTS(0, "TODO: Not implemented yet");
// TODO
ASSERT(0);
}
// TODO: clear the committer
ASSERT(pCommiter->pWriter == NULL);
taosArrayDestroy(pCommiter->aTbDataP);
taosArrayDestroy(pCommiter->aFileOp);
_exit:
if (code) {
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pCommiter->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s, eid:%" PRId64, vid, __func__, lino, tstrerror(code),
pCommiter->eid);
} else {
tsdbDebug( //
"vgId:%d %s done", //
TD_VID(pCommiter->pTsdb->pVnode), //
__func__);
tsdbDebug("vgId:%d %s done, eid:%" PRId64, vid, __func__, pCommiter->eid);
}
return code;
}
......@@ -408,8 +362,11 @@ int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit);
while (committer.nextKey != TSKEY_MAX) {
code = commit_next_file_set(&committer);
if (code) break;
code = commit_fset(&committer);
if (code) {
lino = __LINE__;
break;
}
}
code = close_committer(&committer, code);
......@@ -427,37 +384,28 @@ _exit:
}
int32_t tsdbCommitCommit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
SMemTable *pMemTable = pTsdb->imem;
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(pTsdb->pVnode);
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
if (pTsdb->imem == NULL) goto _exit;
SMemTable *pMemTable = pTsdb->imem;
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSEditCommit(pTsdb->pFS);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
pTsdb->imem = NULL;
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
if (pMemTable) {
tsdbUnrefMemTable(pMemTable, NULL, true);
}
tsdbUnrefMemTable(pMemTable, NULL, true);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", //
TD_VID(pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", //
TD_VID(pTsdb->pVnode), __func__);
tsdbInfo("vgId:%d %s done", vid, __func__);
}
return code;
}
......@@ -465,21 +413,18 @@ _exit:
int32_t tsdbCommitAbort(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(pTsdb->pVnode);
if (pTsdb->imem == NULL) goto _exit;
code = tsdbFSEditAbort(pTsdb->pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", //
TD_VID(pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
tsdbError("vgId:%d, %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", //
TD_VID(pTsdb->pVnode), //
__func__);
tsdbInfo("vgId:%d %s done", vid, __func__);
}
return code;
}
\ No newline at end of file
......@@ -435,51 +435,18 @@ static int32_t edit_fs(STFileSystem *pFS, const SArray *aFileOp) {
int32_t code = 0;
int32_t lino;
taosArrayClearEx(pFS->nstate, NULL /* TODO */);
// TODO: copy current state to new state
STFileSet *pSet = NULL;
for (int32_t iop = 0; iop < taosArrayGetSize(aFileOp); iop++) {
struct STFileOp *pOp = taosArrayGet(aFileOp, iop);
struct STFileSet tmpSet = {.fid = pOp->fid};
int32_t idx = taosArraySearchIdx( //
pFS->nstate, //
&tmpSet, //
(__compar_fn_t)fset_cmpr_fn, //
TD_GE);
struct STFileSet *pSet;
if (idx < 0) {
pSet = NULL;
idx = taosArrayGetSize(pFS->nstate);
} else {
pSet = taosArrayGet(pFS->nstate, idx);
}
if (pSet == NULL || pSet->fid != pOp->fid) {
ASSERTS(pOp->op == TSDB_FOP_CREATE, "BUG: Invalid file operation");
TSDB_CHECK_CODE( //
code = tsdbFileSetCreate(pOp->fid, &pSet), //
lino, //
_exit);
struct STFileOp *op = taosArrayGet(aFileOp, iop);
if (taosArrayInsert(pFS->nstate, idx, pSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pSet == NULL || pSet->fid != op->fid) {
STFileSet fset = {.fid = op->fid};
pSet = taosArraySearch(pFS->nstate, &fset, (__compar_fn_t)tsdbFSetCmprFn, TD_EQ);
}
// do opration on file set
TSDB_CHECK_CODE( //
code = tsdbFileSetEdit(pSet, pOp), //
lino, //
_exit);
// TODO
}
// TODO: write new state to file
_exit:
return 0;
}
......@@ -511,7 +478,12 @@ int32_t tsdbCloseFS(STFileSystem **ppFS) {
return 0;
}
int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) {
int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid) {
eid[0] = ++pFS->neid; // TODO: use atomic operation
return 0;
}
int32_t tsdbFSEditBegin(STFileSystem *fs, int64_t eid, const SArray *aFileOp, EFEditT etype) {
int32_t code = 0;
int32_t lino;
char current_t[TSDB_FILENAME_LEN];
......@@ -525,7 +497,7 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype)
tsem_wait(&fs->canEdit);
fs->etype = etype;
fs->eid = ++fs->neid;
fs->eid = eid;
// edit
code = edit_fs(fs, aFileOp);
......@@ -555,4 +527,10 @@ int32_t tsdbFSEditAbort(STFileSystem *fs) {
int32_t code = abort_edit(fs);
tsem_post(&fs->canEdit);
return code;
}
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **ppFSet) {
STFileSet fset = {.fid = fid};
ppFSet[0] = taosArraySearch(fs->cstate, &fset, (__compar_fn_t)tsdbFSetCmprFn, TD_EQ);
return 0;
}
\ No newline at end of file
......@@ -131,4 +131,10 @@ int32_t tsdbEditFileSet(struct STFileSet *pFileSet, const struct STFileOp *pOp)
ASSERTS(0, "TODO: Not implemented yet");
// TODO
return code;
}
int32_t tsdbFSetCmprFn(const STFileSet *pSet1, const STFileSet *pSet2) {
if (pSet1->fid < pSet2->fid) return -1;
if (pSet1->fid > pSet2->fid) return 1;
return 0;
}
\ No newline at end of file
......@@ -69,8 +69,8 @@ static int32_t tsdbCloseMerger(SMerger *pMerger) {
STsdb *pTsdb = pMerger->pTsdb;
code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit)
// code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE);
// TSDB_CHECK_CODE(code, lino, _exit)
_exit:
if (code) {
......
......@@ -470,27 +470,20 @@ static int32_t destroy_stt_fwriter(SSttFileWriter *pWriter) {
static int32_t open_stt_fwriter(SSttFileWriter *pWriter) {
int32_t code = 0;
int32_t lino;
int32_t lino = 0;
int32_t vid = TD_VID(pWriter->config.pTsdb->pVnode);
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
if (pWriter->tFile.size == 0) {
flag |= TD_FILE_CREATE | TD_FILE_TRUNC;
flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
}
code = tsdbOpenFile( //
pWriter->config.file.fname, //
pWriter->config.szPage, //
flag, //
&pWriter->pFd);
code = tsdbOpenFile(pWriter->config.file.fname, pWriter->config.szPage, flag, &pWriter->pFd);
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->tFile.size == 0) {
code = tsdbWriteFile( //
pWriter->pFd, //
0, //
hdr, //
sizeof(hdr));
code = tsdbWriteFile(pWriter->pFd, 0, hdr, sizeof(hdr));
TSDB_CHECK_CODE(code, lino, _exit);
pWriter->tFile.size += sizeof(hdr);
......@@ -498,23 +491,11 @@ static int32_t open_stt_fwriter(SSttFileWriter *pWriter) {
_exit:
if (code) {
if (pWriter->pFd) {
tsdbCloseFile(&pWriter->pFd);
}
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
if (pWriter->pFd) tsdbCloseFile(&pWriter->pFd);
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
} else {
tsdbDebug( //
"vgId:%d %s done, fname:%s size:%" PRId64, //
TD_VID(pWriter->config.pTsdb->pVnode), //
__func__, //
pWriter->config.file.fname, //
pWriter->config.file.size //
);
tsdbDebug("vgId:%d %s done, fname:%s size:%" PRId64, vid, __func__, pWriter->config.file.fname,
pWriter->config.file.size);
}
return code;
}
......@@ -526,7 +507,8 @@ static int32_t close_stt_fwriter(SSttFileWriter *pWriter) {
int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *pConf, SSttFileWriter **ppWriter) {
int32_t code = 0;
int32_t lino;
int32_t lino = 0;
int32_t vid = TD_VID(pConf->pTsdb->pVnode);
code = create_stt_fwriter(pConf, ppWriter);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -536,15 +518,11 @@ int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *pConf, SSttFileWriter **p
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
if (ppWriter[0]) {
destroy_stt_fwriter(ppWriter[0]);
ppWriter[0] = NULL;
}
tsdbError( //
"vgId:%d %s failed at line %d since %s", //
TD_VID(pConf->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
}
return code;
}
......@@ -552,59 +530,41 @@ _exit:
int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFileOp *op) {
int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode);
int32_t code = 0;
int32_t lino;
int32_t lino = 0;
if (!abort) {
if (ppWriter[0]->bData.nRow > 0) {
TSDB_CHECK_CODE( //
code = write_timeseries_block(ppWriter[0]), //
lino, //
_exit);
code = write_timeseries_block(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (ppWriter[0]->sData.nRow > 0) {
TSDB_CHECK_CODE( //
code = write_statistics_block(ppWriter[0]), //
lino, //
_exit);
code = write_statistics_block(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (ppWriter[0]->dData.nRow > 0) {
TSDB_CHECK_CODE( //
code = write_delete_block(ppWriter[0]), //
lino, //
_exit);
code = write_delete_block(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
}
TSDB_CHECK_CODE( //
code = write_stt_blk(ppWriter[0]), //
lino, //
_exit);
code = write_stt_blk(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = write_statistics_blk(ppWriter[0]), //
lino, //
_exit);
code = write_statistics_blk(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = write_del_blk(ppWriter[0]), //
lino, //
_exit);
code = write_del_blk(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = write_file_footer(ppWriter[0]), //
lino, //
_exit);
code = write_file_footer(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = write_file_header(ppWriter[0]), //
lino, //
_exit);
code = write_file_header(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE( //
code = tsdbFsyncFile(ppWriter[0]->pFd), //
lino, //
_exit);
code = tsdbFsyncFile(ppWriter[0]->pFd);
TSDB_CHECK_CODE(code, lino, _exit);
if (op) {
op->fid = ppWriter[0]->config.file.fid;
......@@ -618,10 +578,8 @@ int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct STFi
}
}
TSDB_CHECK_CODE( //
code = close_stt_fwriter(ppWriter[0]), //
lino, //
_exit);
code = close_stt_fwriter(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
destroy_stt_fwriter(ppWriter[0]);
ppWriter[0] = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册