提交 4b33695f 编写于 作者: H Hongze Cheng

more work

上级 4d8d1df2
......@@ -67,6 +67,7 @@ typedef struct STsdbFS STsdbFS;
typedef struct SRowMerger SRowMerger;
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
#define HAS_NONE ((int8_t)0x1)
#define HAS_NULL ((int8_t)0x2)
......@@ -165,15 +166,8 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, S
TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
// tsdbFile.c ==============================================================================================
// SDataFSet
// SHeadFile
void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]);
// SDataFile
void tsdbDataFileName(STsdb *pTsdb, SDataFile *pFile, char fname[]);
// SLastFile
void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]);
// SSmaFile
void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]);
enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE };
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, int8_t ftype, char fname[]);
// SDelFile
#define tsdbDelFileCreate() \
((SDelFile){ \
......@@ -230,12 +224,10 @@ typedef struct {
TSKEY minKey;
} SRtn;
#define TSDB_DATA_DIR_LEN 6 // adapt accordingly
struct STsdb {
char *path;
SVnode *pVnode;
TdThreadMutex mutex;
char dir[TSDB_DATA_DIR_LEN];
bool repoLocked;
STsdbKeepCfg keepCfg;
SMemTable *mem;
......@@ -487,16 +479,12 @@ struct SSmaFile {
};
struct SDFileSet {
SDiskID diskId;
int32_t fid;
SHeadFile *pHeadFile;
SDataFile *pDataFile;
SLastFile *pLastFile;
SSmaFile *pSmaFile;
// SHeadFile headFile;
// SDataFile dataFile;
// SLastFile lastFile;
// SSmaFile smaFile;
SDiskID diskId;
int32_t fid;
SHeadFile fHead;
SDataFile fData;
SLastFile fLast;
SSmaFile fSma;
};
struct SRowIter {
......
......@@ -17,7 +17,8 @@
typedef struct {
SDelFile *pDelFile;
SArray *aDFileSet; // SArray<aDFileSet *>
SArray *aDFileSet; // SArray<aDFileSet>
SDelFile delFile;
} STsdbFSState;
struct STsdbFS {
......@@ -105,10 +106,10 @@ static int32_t tsdbDFileSetToJson(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "level", pDFileSet->diskId.level) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "id", pDFileSet->diskId.id) < 0) goto _err;
if (tjsonAddIntegerToObject(pJson, "fid", pDFileSet->fid) < 0) goto _err;
if (tjsonAddObject(pJson, "head", tsdbHeadFileToJson, pDFileSet->pHeadFile) < 0) goto _err;
if (tjsonAddObject(pJson, "data", tsdbDataFileToJson, pDFileSet->pDataFile) < 0) goto _err;
if (tjsonAddObject(pJson, "last", tsdbLastFileToJson, pDFileSet->pLastFile) < 0) goto _err;
if (tjsonAddObject(pJson, "sma", tsdbSmaFileToJson, pDFileSet->pSmaFile) < 0) goto _err;
// if (tjsonAddObject(pJson, "head", tsdbHeadFileToJson, pDFileSet->pHeadFile) < 0) goto _err;
// if (tjsonAddObject(pJson, "data", tsdbDataFileToJson, pDFileSet->pDataFile) < 0) goto _err;
// if (tjsonAddObject(pJson, "last", tsdbLastFileToJson, pDFileSet->pLastFile) < 0) goto _err;
// if (tjsonAddObject(pJson, "sma", tsdbSmaFileToJson, pDFileSet->pSmaFile) < 0) goto _err;
return code;
......@@ -273,7 +274,8 @@ static int32_t tsdbLoadCurrentState(STsdbFS *pFS, STsdbFSState *pState) {
char *pData = NULL;
TdFilePtr pFD;
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/CURRENT", pFS->pTsdb->path);
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sCURRENT", tfsGetPrimaryPath(pFS->pTsdb->pVnode->pTfs), TD_DIRSEP,
pFS->pTsdb->path, TD_DIRSEP);
if (!taosCheckExistFile(fname)) {
// create an empry CURRENT file if not exists
......@@ -321,38 +323,6 @@ _err:
return code;
}
static int32_t tsdbFSOpenImpl(STsdbFS *pFS) {
int32_t code = 0;
int64_t size;
int64_t n;
// read CURRENT file
code = tsdbLoadCurrentState(pFS, pFS->cState);
if (code) goto _err;
// decode the statue file
// code = tsdbDecodeFSState(pData, pFS->cState);
// if (code) goto _err;
// // scan and fix invalid tsdb FS
// code = tsdbScanAndFixFS(pFS);
// if (code) goto _err;
// if (pData) taosMemoryFree(pData);
return code;
_err:
// if (pData) taosMemoryFree(pData);
tsdbError("vgId:%d tsdb fs open impl failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbFSCloseImpl(STsdbFS *pFS) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) {
int32_t code = 0;
// TODO
......@@ -448,8 +418,25 @@ _err:
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
static void tsdbFSDestroy(STsdbFS *pFS) {
if (pFS) {
if (pFS->nState) {
taosArrayDestroy(pFS->nState->aDFileSet);
taosMemoryFree(pFS->nState);
}
if (pFS->cState) {
taosArrayDestroy(pFS->cState->aDFileSet);
taosMemoryFree(pFS->cState);
}
taosThreadRwlockDestroy(&pFS->lock);
taosMemoryFree(pFS);
}
// TODO
}
static int32_t tsdbFSCreate(STsdb *pTsdb, STsdbFS **ppFS) {
int32_t code = 0;
STsdbFS *pFS = NULL;
......@@ -462,6 +449,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
code = taosThreadRwlockInit(&pFS->lock, NULL);
if (code) {
taosMemoryFree(pFS);
code = TAOS_SYSTEM_ERROR(code);
goto _err;
}
......@@ -473,7 +461,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet *));
pFS->cState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pFS->cState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
......@@ -484,43 +472,180 @@ int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet *));
pFS->nState->aDFileSet = taosArrayInit(0, sizeof(SDFileSet));
if (pFS->nState->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
code = tsdbFSOpenImpl(pFS);
if (code) goto _err;
*ppFS = pFS;
return code;
_err:
tsdbError("vgId:%d tsdb fs create failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbFSDestroy(pFS);
*ppFS = NULL;
tsdbError("vgId:%d tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSClose(STsdbFS *pFS) {
static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
int32_t code = 0;
STsdb *pTsdb = pFS->pTsdb;
STfs *pTfs = pTsdb->pVnode->pTfs;
int64_t size;
char fname[TSDB_FILENAME_LEN];
char pHdr[TSDB_FHDR_SIZE];
TdFilePtr pFD;
// SDelFile
if (pFS->cState->pDelFile) {
tsdbDelFileName(pTsdb, pFS->cState->pDelFile, fname);
if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size != pFS->cState->pDelFile->size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
if (deepScan) {
// TODO
}
}
// SArray<SDFileSet>
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->cState->aDFileSet); iSet++) {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->cState->aDFileSet, iSet);
// head =========
tsdbDataFileName(pTsdb, pDFileSet, TSDB_HEAD_FILE, fname);
if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (deepScan) {
// TODO
}
// data =========
tsdbDataFileName(pTsdb, pDFileSet, TSDB_DATA_FILE, fname);
if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size < pDFileSet->fData.size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
} else if (size > pDFileSet->fData.size) {
ASSERT(0);
// need to rollback the file
}
if (deepScan) {
// TODO
}
// last ===========
tsdbDataFileName(pTsdb, pDFileSet, TSDB_LAST_FILE, fname);
if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size < pDFileSet->fLast.size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
} else if (size > pDFileSet->fLast.size) {
ASSERT(0);
// need to rollback the file
}
if (deepScan) {
// TODO
}
// sma =============
tsdbDataFileName(pTsdb, pDFileSet, TSDB_SMA_FILE, fname);
if (taosStatFile(fname, &size, NULL)) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (size < pDFileSet->fSma.size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
} else if (size > pDFileSet->fSma.size) {
ASSERT(0);
// need to rollback the file
}
if (deepScan) {
// TODO
}
}
// remove those invalid files (todo)
#if 0
STfsDir *tdir;
const STfsFile *pf;
tdir = tfsOpendir(pTfs, pTsdb->path);
if (tdir == NULL) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
while ((pf = tfsReaddir(tdir))) {
tfsBasename(pf, fname);
}
tfsClosedir(tdir);
#endif
return code;
_err:
tsdbError("vgId:%d tsdb can and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
// EXPOSED APIS ====================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, STsdbFS **ppFS) {
int32_t code = 0;
if (pFS) {
code = tsdbFSCloseImpl(pFS);
if (code) goto _err;
// create handle
code = tsdbFSCreate(pTsdb, ppFS);
if (code) goto _err;
taosArrayDestroy(pFS->nState->aDFileSet);
taosMemoryFree(pFS->nState);
taosArrayDestroy(pFS->cState->aDFileSet);
taosMemoryFree(pFS->cState);
taosThreadRwlockDestroy(&pFS->lock);
taosMemoryFree(pFS);
// load current state
code = tsdbLoadCurrentState(*ppFS, (*ppFS)->cState);
if (code) {
tsdbFSDestroy(*ppFS);
goto _err;
}
// scan and fix FS
code = tsdbScanAndTryFixFS(*ppFS, 0);
if (code) {
tsdbFSDestroy(*ppFS);
goto _err;
}
return code;
_err:
tsdbError("vgId:%d tsdb fs close failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code));
*ppFS = NULL;
tsdbError("vgId:%d tsdb fs open failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbFSClose(STsdbFS *pFS) {
int32_t code = 0;
tsdbFSDestroy(pFS);
return code;
}
......
......@@ -15,29 +15,48 @@
#include "tsdb.h"
static const char *tsdbFileSuffix[] = {".del", ".cache", ".head", ".data", ".last", ".sma", ""};
void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, int8_t ftype, char fname[]) {
STfs *pTfs = pTsdb->pVnode->pTfs;
// SHeadFile ===============================================
void tsdbHeadFileName(STsdb *pTsdb, SHeadFile *pFile, char fname[]) {
// snprintf(fname, TSDB_FILENAME_LEN - 1, "%s/v%df%dver%18d.head", );
switch (ftype) {
case TSDB_HEAD_FILE:
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fHead.commitID,
".head");
break;
case TSDB_DATA_FILE:
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fData.commitID,
".data");
break;
case TSDB_LAST_FILE:
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fLast.commitID,
".last");
break;
case TSDB_SMA_FILE:
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTfs, pDFileSet->diskId),
TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), pDFileSet->fid, pDFileSet->fSma.commitID,
".sma");
break;
default:
ASSERT(0);
break;
}
}
// SHeadFile ===============================================
// SDataFile ===============================================
void tsdbDataFileName(STsdb *pTsdb, SDataFile *pFile, char fname[]) {
// TODO
}
// SLastFile ===============================================
void tsdbLastFileName(STsdb *pTsdb, SLastFile *pFile, char fname[]) {
// TODO
}
// SSmaFile ===============================================
void tsdbSmaFileName(STsdb *pTsdb, SSmaFile *pFile, char fname[]) {
// TODO
}
// SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
// snprintf(fname, TSDB_FILENAME_LEN, "", pTsdb->path);
STfs *pTfs = pTsdb->pVnode->pTfs;
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%dver%" PRId64 "%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, pTsdb->path,
TD_DIRSEP, TD_VID(pTsdb->pVnode), pFile->commitID, ".del");
}
\ No newline at end of file
......@@ -175,6 +175,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
// update the state of pMemTable and other (todo)
pMemTable->minVersion = TMIN(pMemTable->minVersion, version);
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, version);
pMemTable->nDel++;
if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
......@@ -521,7 +523,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
goto _err;
}
if (pTbData->minKey > key.ts) pTbData->minKey = key.ts;
pTbData->minKey = TMIN(pTbData->minKey, key.ts);
pLastRow = row.pTSRow;
......@@ -553,14 +555,14 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow);
}
}
if (pTbData->minVersion > version) pTbData->minVersion = version;
if (pTbData->maxVersion < version) pTbData->maxVersion = version;
pTbData->minVersion = TMIN(pTbData->minVersion, version);
pTbData->maxVersion = TMAX(pTbData->maxVersion, version);
// SMemTable
if (pMemTable->minKey > pTbData->minKey) pMemTable->minKey = pTbData->minKey;
if (pMemTable->maxKey < pTbData->maxKey) pMemTable->maxKey = pTbData->maxKey;
if (pMemTable->minVersion > pTbData->minVersion) pMemTable->minVersion = pTbData->minVersion;
if (pMemTable->maxVersion < pTbData->maxVersion) pMemTable->maxVersion = pTbData->maxVersion;
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
pMemTable->minVersion = TMIN(pMemTable->minVersion, pTbData->minVersion);
pMemTable->maxVersion = TMAX(pMemTable->maxVersion, pTbData->maxVersion);
pMemTable->nRow += nRow;
pRsp->numOfRows = nRow;
......
......@@ -41,7 +41,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
int slen = 0;
*ppTsdb = NULL;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + 3;
slen = strlen(pVnode->path) + strlen(dir) + 2;
// create handle
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
......@@ -50,10 +50,8 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
return -1;
}
ASSERT(strlen(dir) < TSDB_DATA_DIR_LEN);
memcpy(pTsdb->dir, dir, strlen(dir));
pTsdb->path = (char *)&pTsdb[1];
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir);
sprintf(pTsdb->path, "%s%s%s", pVnode->path, TD_DIRSEP, dir);
taosRealPath(pTsdb->path, NULL, slen);
pTsdb->pVnode = pVnode;
pTsdb->repoLocked = false;
......@@ -65,8 +63,8 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
}
// pTsdb->fs = tsdbNewFS(REPO_KEEP_CFG(pTsdb));
// create dir (TODO: use tfsMkdir)
taosMkDir(pTsdb->path);
// create dir
tfsMkdir(pVnode->pTfs, pTsdb->path);
// open tsdb
if (tsdbFSOpen(pTsdb, &pTsdb->fs) < 0) {
......
......@@ -15,7 +15,6 @@
#include "tsdb.h"
#define TSDB_FHDR_SIZE 512
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
// SDelFWriter ====================================================
......@@ -443,8 +442,8 @@ _err:
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadFile->offset;
int64_t size = pReader->pSet->pHeadFile->size;
int64_t offset = pReader->pSet->fHead.offset;
int64_t size = pReader->pSet->fHead.size - offset;
int64_t n;
uint32_t delimiter;
......@@ -629,10 +628,10 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) {
int64_t size = TSDB_FHDR_SIZE;
int64_t n;
uint8_t *pBuf = NULL;
SHeadFile *pHeadFile = pWriter->pSet->pHeadFile;
SDataFile *pDataFile = pWriter->pSet->pDataFile;
SLastFile *pLastFile = pWriter->pSet->pLastFile;
SSmaFile *pSmaFile = pWriter->pSet->pSmaFile;
SHeadFile *pHeadFile = &pWriter->pSet->fHead;
SDataFile *pDataFile = &pWriter->pSet->fData;
SLastFile *pLastFile = &pWriter->pSet->fLast;
SSmaFile *pSmaFile = &pWriter->pSet->fSma;
// alloc
if (!ppBuf) ppBuf = &pBuf;
......@@ -724,7 +723,7 @@ _err:
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) {
int32_t code = 0;
int64_t size = 0;
SHeadFile *pHeadFile = pWriter->pSet->pHeadFile;
SHeadFile *pHeadFile = &pWriter->pSet->fHead;
int64_t n = 0;
uint8_t *pBuf = NULL;
......@@ -766,7 +765,7 @@ _err:
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) {
int32_t code = 0;
SHeadFile *pHeadFile = pWriter->pSet->pHeadFile;
SHeadFile *pHeadFile = &pWriter->pSet->fHead;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册