提交 5d54e855 编写于 作者: H Hongze Cheng

partial work

上级 24b9f1eb
......@@ -365,14 +365,17 @@ typedef struct {
#define TSDB_FILE_F(tf) (&((tf)->f)))
#define TSDB_FILE_FD(tf) ((tf)->fd)
int tsdbOpenFS(STsdbRepo *pRepo);
void tsdbCloseFS(STsdbRepo *pRepo);
int tsdbFSNewTxn(STsdbRepo *pRepo);
int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError);
int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile);
int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet);
void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid);
int tsdbRemoveDFileSet(SDFileSet *pSet);
int tsdbOpenFS(STsdbRepo* pRepo);
void tsdbCloseFS(STsdbRepo* pRepo);
int tsdbFSNewTxn(STsdbRepo* pRepo);
int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError);
int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile);
int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid);
int tsdbRemoveDFileSet(SDFileSet* pSet);
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to);
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
int code = pthread_rwlock_rdlock(&(pFs->lock));
......@@ -401,6 +404,31 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
return 0;
}
// ================= tsdbStore.c
#define KVSTORE_FILE_VERSION ((uint32_t)0)
typedef int (*iterFunc)(void*, void* cont, int contLen);
typedef void (*afterFunc)(void*);
typedef struct {
SMFile f;
SHashObj* map;
iterFunc iFunc;
afterFunc aFunc;
void* appH;
} SKVStore;
#define KVSTORE_MAGIC(s) (s)->f.info.magic
int tdCreateKVStore(char* fname);
int tdDestroyKVStore(char* fname);
SKVStore* tdOpenKVStore(char* fname, iterFunc iFunc, afterFunc aFunc, void* appH);
void tdCloseKVStore(SKVStore* pStore);
int tdKVStoreStartCommit(SKVStore* pStore);
int tdUpdateKVStoreRecord(SKVStore* pStore, uint64_t uid, void* cont, int contLen);
int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore* pStore);
void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size);
// ================= tsdbFile.c
// extern const char* tsdbFileSuffix[];
......@@ -467,7 +495,7 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
// } SFileGroupIter;
// #define TSDB_FILE_NAME(pFile) ((pFile)->file.aname)
// #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
// #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
// #define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId
// #define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId
......@@ -496,7 +524,7 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
// int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
// void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
// void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup);
// void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
// int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
// ================= tsdbMain.c
......
......@@ -15,7 +15,14 @@
#include "tsdbMain.h"
typedef struct {
SFidGroup fidg;
int minFid;
int midFid;
int maxFid;
TSKEY minKey;
} SRtn;
typedef struct {
SRtn rtn;
SCommitIter *iters;
SRWHelper whelper;
SDataCols * pDataCols;
......@@ -32,6 +39,8 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key);
static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch);
static void tsdbDestroyCommitH(SCommitH *pch, int niter);
static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
static int tsdbGetFidLevel(int fid, SRtn *pRtn);
void *tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pRepo) < 0) {
......@@ -64,45 +73,28 @@ _err:
static int tsdbCommitTSData(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
SCommitH ch = {0};
STsdbCfg * pCfg = &(pRepo->config);
// SFidGroup fidGroup = {0};
TSKEY minKey = 0;
TSKEY maxKey = 0;
SCommitH ch = {0};
if (pMem->numOfRows <= 0) return 0;
tsdbGetFidGroup(pCfg, &(ch.fidg));
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, ch.fidg.minFid, &minKey, &maxKey);
tsdbRemoveFilesBeyondRetention(pRepo, &(ch.fidg));
if (tsdbInitCommitH(pRepo, &ch) < 0) {
goto _err;
return -1;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// TODO
int sfid = MIN(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/);
int efid = MAX(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/);
tsdbSeekCommitIter(ch.iters, pMem->maxTables, minKey);
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (fid < ch.fidg.minFid) continue;
if (tsdbCommitToFile(pRepo, fid, &(ch)) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
if (tsdbCommitToFile(pRepo, fid, &ch) < 0) {
tsdbDestroyCommitH(&ch, pMem->maxTables);
return -1;
}
}
tsdbApplyRetention(pRepo, &(ch.fidg));
tsdbDestroyCommitH(&ch, pMem->maxTables);
return 0;
_err:
tsdbDestroyCommitH(&ch, pMem->maxTables);
return -1;
}
static int tsdbCommitMeta(STsdbRepo *pRepo) {
......@@ -148,7 +140,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
}
// TODO: update meta file
tsdbUpdateMFile(pRepo, NULL);
tsdbUpdateMFile(pRepo, &(pMeta->pStore.f));
return 0;
......@@ -195,116 +187,56 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH * pFileH = pRepo->tsdbFileH;
SFileGroup * pGroup = NULL;
SMemTable * pMem = pRepo->imem;
bool newLast = false;
TSKEY minKey = 0;
TSKEY maxKey = 0;
SCommitIter *iters = pch->iters;
SRWHelper * pHelper = &(pch->whelper);
SDataCols * pDataCols = pch->pDataCols;
STsdbCfg * pCfg = &(pRepo->config);
SMemTable *pMem = pRepo->imem;
TSKEY minKey, maxKey;
SDFileSet *pOldSet = NULL;
SDFileSet newSet = {0};
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// Check if there are data to commit to this file
if (!tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey)) {
tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
return 0;
}
if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) {
pGroup = tsdbCreateFGroup(pRepo, fid, tsdbGetFidLevel(fid, pch->fidg));
if (pGroup == NULL) {
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
return -1;
if (pOldSet) { // file exists
int level = tsdbGetFidLevel(fid, &(pch->rtn));
if (level < 0) { // if out of data, remove it and ignore expired memory data
tsdbRemoveExpiredDFileSet(pRepo, fid);
tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1);
return 0;
}
}
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
// Move the data file set to correct level
tsdbMoveDFileSet(pOldSet, level);
}
newLast = TSDB_NLAST_FILE_OPENED(pHelper);
if (tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey)) {
if (tsdbSetAndOpenHelperFile(&(pch->whelper), pOldSet, &newSet) < 0) return -1;
if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
tsdbError("vgId:%d failed to load SBlockIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbLoadCompIdx(&pch->whelper, NULL) < 0) return -1;
// Loop to commit data in each table
for (int tid = 1; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
TSDB_RLOCK_TABLE(pIter->pTable);
for (int tid = 0; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = pch->iters + tid;
if (pIter->pTable == NULL) continue;
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
if (tsdbSetHelperTable(&(pch->whelper), pIter->pTable, pRepo) < 0) return -1;
if (pIter->pIter != NULL) {
if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
TSDB_RLOCK_TABLE(pIter->pTable);
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
goto _err;
if (pIter->pIter != NULL) { // has data in memory to commit
}
}
TSDB_RUNLOCK_TABLE(pIter->pTable);
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
TSDB_RUNLOCK_TABLE(pIter->pTable);
if (tsdbMoveLastBlockIfNeccessary() < 0) return -1;
// Write the SBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
if (tsdbWriteCompInfo() < 0) return -1;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
if (tsdbWriteCompIdx() < 0) return -1;
}
tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock));
// tfsremove(&(helperHeadF(pHelper)->file));
(void)rename(TSDB_FILE_NAME(helperNewHeadF(pHelper)), TSDB_FILE_NAME(helperHeadF(pHelper)));
tfsDecDiskFile(helperNewHeadF(pHelper)->file.level, helperNewHeadF(pHelper)->file.id, 1);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
if (newLast) {
(void)rename(TSDB_FILE_NAME(helperNewLastF(pHelper)), TSDB_FILE_NAME(helperLastF(pHelper)));
tfsDecDiskFile(helperNewLastF(pHelper)->file.level, helperNewLastF(pHelper)->file.id, 1);
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
} else {
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
if (/*file exists OR has data to commit*/) {
tsdbUpdateDFileSet(pRepo, &newSet);
}
pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
pthread_rwlock_unlock(&(pFileH->fhlock));
return 0;
_err:
tsdbCloseHelperFile(pHelper, 1, pGroup);
return -1;
}
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
......@@ -399,4 +331,31 @@ static void tsdbDestroyCommitH(SCommitH *pch, int niter) {
tdFreeDataCols(pch->pDataCols);
tsdbDestroyCommitIters(pch->iters, niter);
tsdbDestroyHelper(&(pch->whelper));
}
static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
STsdbCfg *pCfg = &(pRepo->config);
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = TSDB_KEY_FILEID(minKey, pCfg->daysPerFile, pCfg->precision);
pRtn->midFid = TSDB_KEY_FILEID(midKey, pCfg->daysPerFile, pCfg->precision);
pRtn->maxFid = TSDB_KEY_FILEID(maxKey, pCfg->daysPerFile, pCfg->precision);
}
static int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) {
return 0;
} else if (fid >= pRtn->midFid) {
return 1;
} else if (fid >= pRtn->minFid) {
return 2;
} else {
return -1;
}
}
\ No newline at end of file
......@@ -98,7 +98,7 @@ int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) {
return -1;
}
} else {
int index = TARRAY_ELEM_IDX(dfArray, ptr);
int index = TARRAY_ELEM_IDX(pSnapshot->df, pOldSet);
if (pOldSet->id == pSet->id) {
taosArraySet(pSnapshot->df, index, pSet);
......@@ -125,6 +125,32 @@ void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) {
}
}
int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeVariantI64(buf, pInfo->size);
tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
tlen += taosEncodeVariantI64(buf, pInfo->nDels);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
return tlen;
}
void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->size));
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
return buf;
}
SDFileSet tsdbMoveDFileSet(SDFileSet *pOldSet, int to) {
// TODO
}
static int tsdbSaveFSSnapshot(int fd, SFSSnapshot *pSnapshot) {
// TODO
return 0;
......@@ -152,28 +178,6 @@ static int tsdbOpenFSImpl(STsdbRepo *pRepo) {
return 0;
}
static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) {
int tlen = 0;
tlen += taosEncodeVariantI64(buf, pInfo->size);
tlen += taosEncodeVariantI64(buf, pInfo->tombSize);
tlen += taosEncodeVariantI64(buf, pInfo->nRecords);
tlen += taosEncodeVariantI64(buf, pInfo->nDels);
tlen += taosEncodeFixedU32(buf, pInfo->magic);
return tlen;
}
static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
buf = taosDecodeVariantI64(buf, &(pInfo->size));
buf = taosDecodeVariantI64(buf, &(pInfo->tombSize));
buf = taosDecodeVariantI64(buf, &(pInfo->nRecords));
buf = taosDecodeVariantI64(buf, &(pInfo->nDels));
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
return buf;
}
static int tsdbEncodeMFile(void **buf, SMFile *pMFile) {
int tlen = 0;
......
......@@ -15,20 +15,8 @@
#define _DEFAULT_SOURCE
#define TAOS_RANDOM_FILE_FAIL_TEST
#include "os.h"
#include "hash.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tkvstore.h"
#include "tulog.h"
#define TD_KVSTORE_HEADER_SIZE 512
#define TD_KVSTORE_MAJOR_VERSION 1
#define TD_KVSTORE_MAINOR_VERSION 0
#define TD_KVSTORE_SNAP_SUFFIX ".snap"
#define TD_KVSTORE_NEW_SUFFIX ".new"
#define TD_KVSTORE_INIT_MAGIC 0xFFFFFFFF
#include "tsdbMain.h"
typedef struct {
uint64_t uid;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_KVSTORE_H_
#define _TD_KVSTORE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#define KVSTORE_FILE_VERSION ((uint32_t)0)
typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int64_t size; // including 512 bytes of header size
int64_t tombSize;
int64_t nRecords;
int64_t nDels;
uint32_t magic;
} SStoreInfo;
typedef struct {
char * fname;
int fd;
SHashObj * map;
iterFunc iFunc;
afterFunc aFunc;
void * appH;
SStoreInfo info;
} SKVStore;
#define KVSTORE_MAGIC(s) (s)->info.magic
int tdCreateKVStore(char *fname);
int tdDestroyKVStore(char *fname);
SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH);
void tdCloseKVStore(SKVStore *pStore);
int tdKVStoreStartCommit(SKVStore *pStore);
int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen);
int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid);
int tdKVStoreEndCommit(SKVStore *pStore);
void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size);
#ifdef __cplusplus
}
#endif
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册