未验证 提交 6dfca928 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #1467 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
...@@ -472,7 +472,7 @@ void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t le ...@@ -472,7 +472,7 @@ void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t le
SL_GET_FORWARD_POINTER(x, i) = pNode; SL_GET_FORWARD_POINTER(x, i) = pNode;
} else { } else {
SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead); // SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead);
} }
} }
} }
......
...@@ -82,6 +82,19 @@ int tsdbOpenFile(SFile *pFile, int oflag); ...@@ -82,6 +82,19 @@ int tsdbOpenFile(SFile *pFile, int oflag);
int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
#define TSDB_FGROUP_ITER_FORWARD 0
#define TSDB_FGROUP_ITER_BACKWARD 1
typedef struct {
int numOfFGroups;
SFileGroup *base;
SFileGroup *pFileGroup;
int direction;
} SFileGroupIter;
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction);
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid);
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter);
typedef struct { typedef struct {
int32_t len; int32_t len;
int32_t offset; int32_t offset;
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include "tutil.h"
#include "tsdbFile.h" #include "tsdbFile.h"
const char *tsdbFileSuffix[] = { const char *tsdbFileSuffix[] = {
...@@ -35,6 +36,7 @@ static int compFGroup(const void *arg1, const void *arg2); ...@@ -35,6 +36,7 @@ static int compFGroup(const void *arg1, const void *arg2);
static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname);
static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles);
...@@ -50,10 +52,17 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { ...@@ -50,10 +52,17 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
return NULL; return NULL;
} }
struct dirent *dp; struct dirent *dp = NULL;
int fid = 0;
SFileGroup fGroup = {0};
while ((dp = readdir(dir)) != NULL) { while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue;
// TODO int fid = 0;
sscanf(dp->d_name, "f%d", &fid);
if (tsdbOpenFGroup(pFileH, dataDir, fid) < 0) {
break;
// TODO
}
} }
return pFileH; return pFileH;
...@@ -61,6 +70,30 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { ...@@ -61,6 +70,30 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); }
static int tsdbInitFile(char *dataDir, int fid, char *suffix, SFile *pFile) {
tsdbGetFileName(dataDir, fid, suffix, pFile->fname);
if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1;
pFile->fd = -1;
// TODO: recover the file info
// pFile->info = {0};
return 0;
}
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) {
if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0;
char fname[128] = "\0";
SFileGroup fGroup = {0};
fGroup.fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1;
}
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
return 0;
}
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1;
...@@ -101,6 +134,51 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { ...@@ -101,6 +134,51 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
return 0; return 0;
} }
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
pIter->direction = direction;
pIter->base = pFileH->fGroup;
pIter->numOfFGroups = pFileH->numOfFGroups;
if (pFileH->numOfFGroups == 0){
pIter->pFileGroup = NULL;
} else {
if (direction == TSDB_FGROUP_ITER_FORWARD) {
pIter->pFileGroup = pFileH->fGroup;
} else {
pIter->pFileGroup = pFileH->fGroup + pFileH->numOfFGroups - 1;
}
}
}
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags);
if (ptr == NULL) {
pIter->pFileGroup = NULL;
} else {
pIter->pFileGroup = (SFileGroup *)ptr;
}
}
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
SFileGroup *ret = pIter->pFileGroup;
if (ret == NULL) return NULL;
if (pIter->direction = TSDB_FGROUP_ITER_FORWARD) {
if (pIter->pFileGroup + 1 == pIter->base + pIter->numOfFGroups) {
pIter->pFileGroup = NULL;
} else {
pIter->pFileGroup += 1;
}
} else {
if (pIter->pFileGroup - 1 == pIter->base) {
pIter->pFileGroup = NULL;
} else {
pIter->pFileGroup -= 1;
}
}
return ret;
}
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) {
SCompBlock *pBlock = pStartBlock; SCompBlock *pBlock = pStartBlock;
for (int i = 0; i < numOfBlocks; i++) { for (int i = 0; i < numOfBlocks; i++) {
......
...@@ -237,6 +237,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) { ...@@ -237,6 +237,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) {
* @return a TSDB repository handle on success, NULL for failure and the error number is set * @return a TSDB repository handle on success, NULL for failure and the error number is set
*/ */
tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
char dataDir[128] = "\0";
if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) {
return NULL; return NULL;
} }
...@@ -265,6 +266,16 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { ...@@ -265,6 +266,16 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return NULL; return NULL;
} }
tsdbGetDataDirName(pRepo, dataDir);
pRepo->tsdbFileH = tsdbInitFileH(dataDir, pRepo->config.maxTables);
if (pRepo->tsdbFileH == NULL) {
tsdbFreeCache(pRepo->tsdbCache);
tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo->rootDir);
free(pRepo);
return NULL;
}
pRepo->state = TSDB_REPO_STATE_ACTIVE; pRepo->state = TSDB_REPO_STATE_ACTIVE;
return (tsdb_repo_t *)pRepo; return (tsdb_repo_t *)pRepo;
...@@ -287,8 +298,29 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) { ...@@ -287,8 +298,29 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) {
if (pRepo == NULL) return 0; if (pRepo == NULL) return 0;
pRepo->state = TSDB_REPO_STATE_CLOSED; pRepo->state = TSDB_REPO_STATE_CLOSED;
tsdbLockRepo(repo);
if (pRepo->commit) {
tsdbUnLockRepo(repo);
return -1;
}
pRepo->commit = 1;
// Loop to move pData to iData
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pRepo->tsdbMeta->tables[i];
if (pTable != NULL && pTable->mem != NULL) {
pTable->imem = pTable->mem;
pTable->mem = NULL;
}
}
// TODO: Loop to move mem to imem
pRepo->tsdbCache->imem = pRepo->tsdbCache->mem;
pRepo->tsdbCache->mem = NULL;
pRepo->tsdbCache->curBlock = NULL;
tsdbUnLockRepo(repo);
tsdbCommitData((void *)repo);
tsdbFlushCache(pRepo); tsdbCloseFileH(pRepo->tsdbFileH);
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
......
...@@ -49,6 +49,7 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ...@@ -49,6 +49,7 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
} }
// TEST(TsdbTest, DISABLED_createRepo) {
TEST(TsdbTest, createRepo) { TEST(TsdbTest, createRepo) {
STsdbCfg config; STsdbCfg config;
...@@ -87,6 +88,7 @@ TEST(TsdbTest, createRepo) { ...@@ -87,6 +88,7 @@ TEST(TsdbTest, createRepo) {
double stime = getCurTime(); double stime = getCurTime();
for (int k = 0; k < nRows/rowsPerSubmit; k++) { for (int k = 0; k < nRows/rowsPerSubmit; k++) {
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
SSubmitBlk *pBlock = pMsg->blocks; SSubmitBlk *pBlock = pMsg->blocks;
pBlock->uid = 987607499877672L; pBlock->uid = 987607499877672L;
pBlock->tid = 0; pBlock->tid = 0;
...@@ -108,6 +110,9 @@ TEST(TsdbTest, createRepo) { ...@@ -108,6 +110,9 @@ TEST(TsdbTest, createRepo) {
} }
pBlock->len += dataRowLen(row); pBlock->len += dataRowLen(row);
} }
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
pMsg->numOfBlocks = 1;
pBlock->len = htonl(pBlock->len); pBlock->len = htonl(pBlock->len);
pBlock->numOfRows = htonl(pBlock->numOfRows); pBlock->numOfRows = htonl(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid); pBlock->uid = htobe64(pBlock->uid);
...@@ -116,7 +121,6 @@ TEST(TsdbTest, createRepo) { ...@@ -116,7 +121,6 @@ TEST(TsdbTest, createRepo) {
pBlock->sversion = htonl(pBlock->sversion); pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding); pBlock->padding = htonl(pBlock->padding);
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
pMsg->length = htonl(pMsg->length); pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
pMsg->compressed = htonl(pMsg->numOfBlocks); pMsg->compressed = htonl(pMsg->numOfBlocks);
...@@ -126,15 +130,17 @@ TEST(TsdbTest, createRepo) { ...@@ -126,15 +130,17 @@ TEST(TsdbTest, createRepo) {
double etime = getCurTime(); double etime = getCurTime();
printf("Spent %f seconds to write %d records\n", etime - stime, nRows); void *ptr = malloc(150000);
free(ptr);
printf("Spent %f seconds to write %d records\n", etime - stime, nRows);
// tsdbTriggerCommit(pRepo); tsdbCloseRepo(pRepo);
} }
TEST(TsdbTest, DISABLED_openRepo) { // TEST(TsdbTest, DISABLED_openRepo) {
TEST(TsdbTest, openRepo) {
tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0");
ASSERT_NE(pRepo, nullptr); ASSERT_NE(pRepo, nullptr);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册