提交 efa09d44 编写于 作者: H Hongze Cheng

more progress

上级 0a5200f0
...@@ -19,7 +19,14 @@ ...@@ -19,7 +19,14 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct {
uint64_t uid;
int64_t offset;
int64_t size;
} SKVRecord;
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -68,16 +68,15 @@ typedef struct { ...@@ -68,16 +68,15 @@ typedef struct {
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
#if 0 STsdbFS *tsdbNewFS(int keep, int days);
int tsdbOpenFS(STsdbRepo* pRepo); void * tsdbFreeFS(STsdbFS *pfs);
void tsdbCloseFS(STsdbRepo* pRepo); int tdbOpenFS(STsdbFS *pFs, int keep, int days);
int tsdbFSNewTxn(STsdbRepo* pRepo); void tsdbCloseFS(STsdbFS *pFs);
int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); int tsdbStartTxn(STsdbFS *pfs);
int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); int tsdbEndTxn(STsdbFS *pfs);
int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); int tsdbEndTxnWithError(STsdbFS *pfs);
int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter); void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
SDFileSet* tsdbFSIterNext(SFSIter* pIter); int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
#endif
static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock)); int code = pthread_rwlock_rdlock(&(pFs->lock));
......
...@@ -85,56 +85,151 @@ _err: ...@@ -85,56 +85,151 @@ _err:
// =================== Commit Meta Data // =================== Commit Meta Data
static int tsdbCommitMeta(STsdbRepo *pRepo) { static int tsdbCommitMeta(STsdbRepo *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
SMemTable *pMem = pRepo->imem; SMemTable *pMem = pRepo->imem;
STsdbMeta *pMeta = pRepo->tsdbMeta; SMFile * pOMFile = pfs->cstatus->pmf;
SMFile mf;
SActObj * pAct = NULL; SActObj * pAct = NULL;
SActCont * pCont = NULL; SActCont * pCont = NULL;
SListNode *pNode = NULL;
if (listNEles(pMem->actList) <= 0) return 0; ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0);
if (tdKVStoreStartCommit(pMeta->pStore) < 0) { if (listNEles(pMem->actList) <= 0) {
tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); // no
goto _err; tsdbUpdateMFile(pfs, pOMFile);
} return 0;
} else {
// Create/Open a meta file or open the existing file
if (pOMFile == NULL) {
// Create a new meta file
tsdbInitMFile(&mf, {.level = TFS_PRIMARY_LEVEL, .id = TFS_PRIMARY_ID}, REPO_ID(pRepo), pfs->nstatus->meta.version);
SListNode *pNode = NULL; if (tsdbCreateMFile(&mf) < 0) {
return -1;
}
} else {
tsdbInitMFile(&mf, pOMFile);
if (tsdbOpenMFile(&mf, O_WRONLY) < 0) {
return -1;
}
}
}
// Loop to write
while ((pNode = tdListPopHead(pMem->actList)) != NULL) { while ((pNode = tdListPopHead(pMem->actList)) != NULL) {
pAct = (SActObj *)pNode->data; pAct = (SActObj *)pNode->data;
if (pAct->act == TSDB_UPDATE_META) { if (pAct->act == TSDB_UPDATE_META) {
pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj));
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, tsdbCloseMFile(&mf);
tstrerror(terrno)); return -1;
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
} }
} else if (pAct->act == TSDB_DROP_META) { } else if (pAct->act == TSDB_DROP_META) {
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, tsdbCloseMFile(&mf);
tstrerror(terrno)); return -1;
tdKVStoreEndCommit(pMeta->pStore);
goto _err;
} }
} else { } else {
ASSERT(false); ASSERT(false);
} }
} }
if (tdKVStoreEndCommit(pMeta->pStore) < 0) { if (tsdbUpdateMFileHeader(&mf) < 0) {
tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1;
goto _err;
} }
// TODO: update meta file tsdbCloseMFile(&mf);
tsdbUpdateMFile(pRepo, &(pMeta->pStore.f)); tsdbUpdateMFile(pfs, &mf);
return 0; return 0;
}
_err: int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) {
return -1; int tlen = 0;
tlen += taosEncodeFixedU64(buf, pRecord->uid);
tlen += taosEncodeFixedI64(buf, pRecord->offset);
tlen += taosEncodeFixedI64(buf, pRecord->size);
return tlen;
}
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) {
buf = taosDecodeFixedU64(buf, &(pRecord->uid));
buf = taosDecodeFixedI64(buf, &(pRecord->offset));
buf = taosDecodeFixedI64(buf, &(pRecord->size));
return buf;
}
static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen) {
char buf[64] = "\0";
void * pBuf = buf;
SKVRecord rInfo;
int64_t offset;
// Seek to end of meta file
offset = tsdbSeekMFile(pMFile, 0, SEEK_END);
if (offset < 0) {
return -1;
}
rInfo.offset = offset;
rInfo.uid = uid;
rInfo.size = contLen;
tlen = tsdbEncodeKVRecord((void **)(&pBuf), pRInfo);
if (tsdbAppendMFile(pMFile, buf, tlen) < tlen) {
return -1;
}
if (tsdbAppendMFile(pMFile, cont, contLen) < contLen) {
return -1;
}
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)&uid, sizeof(uid));
if (pRecord != NULL) {
pMFile->info.tombSize += pRecord->size;
} else {
pMFile->info.nRecords++;
}
taosHashPut(pfs->metaCache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
return 0;
} }
static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
SKVRecord rInfo = {0};
char buf[128] = "\0";
SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid));
if (pRecord == NULL) {
tsdbError("failed to drop KV store record with key %" PRIu64 " since not find", uid);
return -1;
}
rInfo.offset = -pRecord->offset;
rInfo.uid = pRecord->uid;
rInfo.size = pRecord->size;
void *pBuf = buf;
tdEncodeKVRecord(&pBuf, &rInfo);
if (tsdbAppendMFile(pMFile, buf, POINTER_DISTANCE(pBuf, buf), NULL) < 0) {
return -1;
}
pMFile->meta.magic = taosCalcChecksum(pStore->info.magic, (uint8_t *)buf, (uint32_t)POINTER_DISTANCE(pBuf, buf));
pMFile->meta.nDels++;
pMFile->meta.nRecords--;
pMFile->meta.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid));
return 0;
}
// =================== Commit Time-Series Data // =================== Commit Time-Series Data
static int tsdbCommitTSData(STsdbRepo *pRepo) { static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem; SMemTable *pMem = pRepo->imem;
......
...@@ -18,12 +18,6 @@ ...@@ -18,12 +18,6 @@
#include "tsdbint.h" #include "tsdbint.h"
typedef struct {
uint64_t uid;
int64_t offset;
int64_t size;
} SKVRecord;
static int tdInitKVStoreHeader(int fd, char *fname); static int tdInitKVStoreHeader(int fd, char *fname);
static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo); static int tdEncodeStoreInfo(void **buf, SStoreInfo *pInfo);
static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo); static void * tdDecodeStoreInfo(void *buf, SStoreInfo *pInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册