diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 1760919b053ffcd1ea9703b1b7a51bd023d49547..77f643a2c7ea2840439e4d21f173bf78ac571869 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -472,7 +472,7 @@ void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, int32_t le SL_GET_FORWARD_POINTER(x, i) = pNode; } else { SL_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode; - SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead); + // SL_GET_BACKWARD_POINTER(pSkipList->pHead, i) = (pSkipList->pHead); } } } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 0c85c5ef46548df5fd6d1d5866c2ddff411040a7..6c42d4aa155addfd1daa8d9d98319dec9bff748d 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -82,6 +82,19 @@ int tsdbOpenFile(SFile *pFile, int oflag); int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid); int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); +#define TSDB_FGROUP_ITER_FORWARD 0 +#define TSDB_FGROUP_ITER_BACKWARD 1 +typedef struct { + int numOfFGroups; + SFileGroup *base; + SFileGroup *pFileGroup; + int direction; +} SFileGroupIter; + +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction); +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid); +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter); + typedef struct { int32_t len; int32_t offset; diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 5240a99a370727a773422877c6e88a8984ea42dc..d6964112e7fef975aaf0b34c67faefe268fdab4e 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -22,6 +22,7 @@ #include #include +#include "tutil.h" #include "tsdbFile.h" const char *tsdbFileSuffix[] = { @@ -35,6 +36,7 @@ static int compFGroup(const void *arg1, const void *arg2); static int tsdbGetFileName(char *dataDir, int fileId, char *suffix, char *fname); static int tsdbWriteFileHead(SFile *pFile); static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); +static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); @@ -50,10 +52,17 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { return NULL; } - struct dirent *dp; + struct dirent *dp = NULL; + int fid = 0; + SFileGroup fGroup = {0}; while ((dp = readdir(dir)) != NULL) { if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; - // TODO + int fid = 0; + sscanf(dp->d_name, "f%d", &fid); + if (tsdbOpenFGroup(pFileH, dataDir, fid) < 0) { + break; + // TODO + } } return pFileH; @@ -61,6 +70,30 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } +static int tsdbInitFile(char *dataDir, int fid, char *suffix, SFile *pFile) { + tsdbGetFileName(dataDir, fid, suffix, pFile->fname); + if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; + pFile->fd = -1; + // TODO: recover the file info + // pFile->info = {0}; + return 0; +} + +static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { + if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0; + + char fname[128] = "\0"; + SFileGroup fGroup = {0}; + fGroup.fileId = fid; + + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1; + } + pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; + qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); + return 0; +} + int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; @@ -101,6 +134,51 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } +void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { + pIter->direction = direction; + pIter->base = pFileH->fGroup; + pIter->numOfFGroups = pFileH->numOfFGroups; + if (pFileH->numOfFGroups == 0){ + pIter->pFileGroup = NULL; + } else { + if (direction == TSDB_FGROUP_ITER_FORWARD) { + pIter->pFileGroup = pFileH->fGroup; + } else { + pIter->pFileGroup = pFileH->fGroup + pFileH->numOfFGroups - 1; + } + } +} + +void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { + int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; + void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags); + if (ptr == NULL) { + pIter->pFileGroup = NULL; + } else { + pIter->pFileGroup = (SFileGroup *)ptr; + } +} + +SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { + SFileGroup *ret = pIter->pFileGroup; + if (ret == NULL) return NULL; + + if (pIter->direction = TSDB_FGROUP_ITER_FORWARD) { + if (pIter->pFileGroup + 1 == pIter->base + pIter->numOfFGroups) { + pIter->pFileGroup = NULL; + } else { + pIter->pFileGroup += 1; + } + } else { + if (pIter->pFileGroup - 1 == pIter->base) { + pIter->pFileGroup = NULL; + } else { + pIter->pFileGroup -= 1; + } + } + return ret; +} + int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { SCompBlock *pBlock = pStartBlock; for (int i = 0; i < numOfBlocks; i++) { diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index bf870a0d05910fcd83509f9d5bd1b5494c404f84..6d675d300a418b521fd5595396c25f3dfac88d55 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -237,6 +237,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo) { * @return a TSDB repository handle on success, NULL for failure and the error number is set */ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { + char dataDir[128] = "\0"; if (access(tsdbDir, F_OK | W_OK | R_OK) < 0) { return NULL; } @@ -265,6 +266,16 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return NULL; } + tsdbGetDataDirName(pRepo, dataDir); + pRepo->tsdbFileH = tsdbInitFileH(dataDir, pRepo->config.maxTables); + if (pRepo->tsdbFileH == NULL) { + tsdbFreeCache(pRepo->tsdbCache); + tsdbFreeMeta(pRepo->tsdbMeta); + free(pRepo->rootDir); + free(pRepo); + return NULL; + } + pRepo->state = TSDB_REPO_STATE_ACTIVE; return (tsdb_repo_t *)pRepo; @@ -287,8 +298,29 @@ int32_t tsdbCloseRepo(tsdb_repo_t *repo) { if (pRepo == NULL) return 0; pRepo->state = TSDB_REPO_STATE_CLOSED; + tsdbLockRepo(repo); + if (pRepo->commit) { + tsdbUnLockRepo(repo); + return -1; + } + pRepo->commit = 1; + // Loop to move pData to iData + for (int i = 0; i < pRepo->config.maxTables; i++) { + STable *pTable = pRepo->tsdbMeta->tables[i]; + if (pTable != NULL && pTable->mem != NULL) { + pTable->imem = pTable->mem; + pTable->mem = NULL; + } + } + // TODO: Loop to move mem to imem + pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; + pRepo->tsdbCache->mem = NULL; + pRepo->tsdbCache->curBlock = NULL; + tsdbUnLockRepo(repo); + + tsdbCommitData((void *)repo); - tsdbFlushCache(pRepo); + tsdbCloseFileH(pRepo->tsdbFileH); tsdbFreeMeta(pRepo->tsdbMeta); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index bc6532984fddd74934c9f87a9e5f1086f08faef3..6cfe0e626dd217b70597c2f75f2f7a575472d1e3 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -49,6 +49,7 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) { ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); } +// TEST(TsdbTest, DISABLED_createRepo) { TEST(TsdbTest, createRepo) { STsdbCfg config; @@ -87,6 +88,7 @@ TEST(TsdbTest, createRepo) { double stime = getCurTime(); for (int k = 0; k < nRows/rowsPerSubmit; k++) { + memset((void *)pMsg, 0, sizeof(SSubmitMsg)); SSubmitBlk *pBlock = pMsg->blocks; pBlock->uid = 987607499877672L; pBlock->tid = 0; @@ -108,6 +110,9 @@ TEST(TsdbTest, createRepo) { } pBlock->len += dataRowLen(row); } + pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; + pMsg->numOfBlocks = 1; + pBlock->len = htonl(pBlock->len); pBlock->numOfRows = htonl(pBlock->numOfRows); pBlock->uid = htobe64(pBlock->uid); @@ -116,7 +121,6 @@ TEST(TsdbTest, createRepo) { pBlock->sversion = htonl(pBlock->sversion); pBlock->padding = htonl(pBlock->padding); - pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pMsg->compressed = htonl(pMsg->numOfBlocks); @@ -126,15 +130,17 @@ TEST(TsdbTest, createRepo) { double etime = getCurTime(); - printf("Spent %f seconds to write %d records\n", etime - stime, nRows); - + void *ptr = malloc(150000); + free(ptr); + printf("Spent %f seconds to write %d records\n", etime - stime, nRows); - // tsdbTriggerCommit(pRepo); + tsdbCloseRepo(pRepo); } -TEST(TsdbTest, DISABLED_openRepo) { +// TEST(TsdbTest, DISABLED_openRepo) { +TEST(TsdbTest, openRepo) { tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); ASSERT_NE(pRepo, nullptr); }