提交 24bd3b4d 编写于 作者: P plum-lihui

Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0

add_subdirectory(transport) add_subdirectory(transport)
add_subdirectory(sync) add_subdirectory(sync)
add_subdirectory(tdb) # add_subdirectory(tdb)
add_subdirectory(index) add_subdirectory(index)
add_subdirectory(wal) add_subdirectory(wal)
add_subdirectory(parser) add_subdirectory(parser)
......
# tdb
set(TDB_SUBDIRS "db") add_library(tdb "")
foreach(TDB_SUBDIR ${TDB_SUBDIRS}) target_sources(tdb
aux_source_directory("src/${TDB_SUBDIR}" TDB_SRC) PRIVATE
endforeach() "src/db/tdbPCache.c"
"src/db/tdbPager.c"
add_library(tdb STATIC ${TDB_SRC}) "src/db/tdbUtil.c"
"src/db/tdbBtree.c"
"src/db/tdbDb.c"
"src/db/tdbEnv.c"
# "src/db/tdbPage.c"
"src/page/tdbPage.c"
"src/page/tdbPageL.c"
)
target_include_directories( target_include_directories(
tdb tdb
...@@ -17,6 +24,7 @@ target_link_libraries( ...@@ -17,6 +24,7 @@ target_link_libraries(
PUBLIC util PUBLIC util
) )
# for test
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -22,44 +22,42 @@ ...@@ -22,44 +22,42 @@
extern "C" { extern "C" {
#endif #endif
typedef struct STDb TDB; // typedef struct STDb TDB;
typedef struct STDbEnv TENV; // typedef struct STDbEnv TENV;
typedef struct STDbCurosr TDBC; // typedef struct STDbCurosr TDBC;
typedef int32_t pgsz_t; // typedef int32_t pgsz_t;
typedef int32_t cachesz_t; // typedef int32_t cachesz_t;
typedef int (*TdbKeyCmprFn)(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2); // typedef int (*TdbKeyCmprFn)(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2);
// TEVN // // TEVN
int tdbEnvCreate(TENV **ppEnv, const char *rootDir); // int tdbEnvCreate(TENV **ppEnv, const char *rootDir);
int tdbEnvOpen(TENV *ppEnv); // int tdbEnvOpen(TENV *ppEnv);
int tdbEnvClose(TENV *pEnv); // int tdbEnvClose(TENV *pEnv);
int tdbEnvSetCache(TENV *pEnv, pgsz_t pgSize, cachesz_t cacheSize); // int tdbEnvSetCache(TENV *pEnv, pgsz_t pgSize, cachesz_t cacheSize);
pgsz_t tdbEnvGetPageSize(TENV *pEnv); // pgsz_t tdbEnvGetPageSize(TENV *pEnv);
cachesz_t tdbEnvGetCacheSize(TENV *pEnv); // cachesz_t tdbEnvGetCacheSize(TENV *pEnv);
int tdbEnvBeginTxn(TENV *pEnv); // int tdbEnvBeginTxn(TENV *pEnv);
int tdbEnvCommit(TENV *pEnv); // int tdbEnvCommit(TENV *pEnv);
// TDB // // TDB
int tdbCreate(TDB **ppDb); // int tdbCreate(TDB **ppDb);
int tdbOpen(TDB *pDb, const char *fname, const char *dbname, TENV *pEnv); // int tdbOpen(TDB *pDb, const char *fname, const char *dbname, TENV *pEnv);
int tdbClose(TDB *pDb); // int tdbClose(TDB *pDb);
int tdbDrop(TDB *pDb); // int tdbDrop(TDB *pDb);
int tdbSetKeyLen(TDB *pDb, int klen); // int tdbSetKeyLen(TDB *pDb, int klen);
int tdbSetValLen(TDB *pDb, int vlen); // int tdbSetValLen(TDB *pDb, int vlen);
int tdbSetDup(TDB *pDb, int dup); // int tdbSetDup(TDB *pDb, int dup);
int tdbSetCmprFunc(TDB *pDb, TdbKeyCmprFn fn); // int tdbSetCmprFunc(TDB *pDb, TdbKeyCmprFn fn);
int tdbGetKeyLen(TDB *pDb); // int tdbGetKeyLen(TDB *pDb);
int tdbGetValLen(TDB *pDb); // int tdbGetValLen(TDB *pDb);
int tdbGetDup(TDB *pDb); // int tdbGetDup(TDB *pDb);
int tdbInsert(TDB *pDb, const void *pKey, int nKey, const void *pData, int nData); // int tdbInsert(TDB *pDb, const void *pKey, int nKey, const void *pData, int nData);
// TDBC
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct STDb {
char dbname[TDB_MAX_DBNAME_LEN];
SBTree * pBt; // current access method (may extend)
SPgFile * pPgFile; // backend page file this DB is using
TENV * pEnv; // TENV containing the DB
int klen; // key length if know
int vlen; // value length if know
bool dup; // dup mode
TdbKeyCmprFn cFn; // compare function
};
struct STDbCurosr {
SBtCursor *pBtCur;
};
static int tdbDefaultKeyCmprFn(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2);
int tdbCreate(TDB **ppDb) {
TDB *pDb;
// create the handle
pDb = (TDB *)calloc(1, sizeof(*pDb));
if (pDb == NULL) {
return -1;
}
pDb->klen = TDB_VARIANT_LEN;
pDb->vlen = TDB_VARIANT_LEN;
pDb->dup = false;
pDb->cFn = tdbDefaultKeyCmprFn;
*ppDb = pDb;
return 0;
}
static int tdbDestroy(TDB *pDb) {
if (pDb) {
free(pDb);
}
return 0;
}
int tdbOpen(TDB *pDb, const char *fname, const char *dbname, TENV *pEnv) {
int ret;
uint8_t fileid[TDB_FILE_ID_LEN];
SPgFile * pPgFile;
SPgCache *pPgCache;
SBTree * pBt;
bool fileExist;
size_t dbNameLen;
pgno_t dbRootPgno;
char dbfname[128]; // TODO: make this as a macro or malloc on the heap
ASSERT(pDb != NULL);
ASSERT(fname != NULL);
// TODO: Here we simply put an assert here. In the future, make `pEnv`
// can be set as NULL.
ASSERT(pEnv != NULL);
// check the DB name
dbNameLen = 0;
if (dbname) {
dbNameLen = strlen(dbname);
if (dbNameLen >= TDB_MAX_DBNAME_LEN) {
return -1;
}
memcpy(pDb->dbname, dbname, dbNameLen);
}
pDb->dbname[dbNameLen] = '\0';
// get page file from the env, if not opened yet, open it
pPgFile = NULL;
snprintf(dbfname, 128, "%s/%s", tdbEnvGetRootDir(pEnv), fname);
fileExist = taosCheckExistFile(fname);
if (fileExist) {
tdbGnrtFileID(dbfname, fileid, false);
pPgFile = tdbEnvGetPageFile(pEnv, fileid);
}
if (pPgFile == NULL) {
ret = pgFileOpen(&pPgFile, dbfname, pEnv);
if (ret != 0) {
// TODO: handle error
return -1;
}
}
// TODO: get the root page number from the master DB of the page file
// tdbGet(&dbRootPgno);
if (dbRootPgno == 0) {
// DB not exist, create one
ret = pgFileAllocatePage(pPgFile, &dbRootPgno);
if (ret != 0) {
// TODO: handle error
}
// tdbInsert(pPgFile->pMasterDB, dbname, strlen(dbname), &dbRootPgno, sizeof(dbRootPgno));
}
ASSERT(dbRootPgno > 1);
// pDb->pBt->root = dbRootPgno;
// register
pDb->pPgFile = pPgFile;
tdbEnvRgstDB(pEnv, pDb);
pDb->pEnv = pEnv;
return 0;
}
int tdbClose(TDB *pDb) {
if (pDb == NULL) return 0;
return tdbDestroy(pDb);
}
int tdbDrop(TDB *pDb) {
// TODO
return 0;
}
int tdbSetKeyLen(TDB *pDb, int klen) {
// TODO: check `klen`
pDb->klen = klen;
return 0;
}
int tdbSetValLen(TDB *pDb, int vlen) {
// TODO: check `vlen`
pDb->vlen = vlen;
return 0;
}
int tdbSetDup(TDB *pDb, int dup) {
if (dup) {
pDb->dup = true;
} else {
pDb->dup = false;
}
return 0;
}
int tdbSetCmprFunc(TDB *pDb, TdbKeyCmprFn fn) {
if (fn == NULL) {
return -1;
} else {
pDb->cFn = fn;
}
return 0;
}
int tdbGetKeyLen(TDB *pDb) { return pDb->klen; }
int tdbGetValLen(TDB *pDb) { return pDb->vlen; }
int tdbGetDup(TDB *pDb) {
if (pDb->dup) {
return 1;
} else {
return 0;
}
}
int tdbInsert(TDB *pDb, const void *pKey, int nKey, const void *pData, int nData) {
// TODO
return 0;
}
static int tdbDefaultKeyCmprFn(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2) {
int mlen;
int cret;
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
if (keyLen1 < keyLen2) {
cret = -1;
} else if (keyLen1 > keyLen2) {
cret = 1;
} else {
cret = 0;
}
}
return cret;
}
\ No newline at end of file
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct STDb {
STEnv *pEnv;
SBTree *pBt;
};
int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, STEnv *pEnv, STDb **ppDb) {
STDb *pDb;
SPager *pPager;
int ret;
char fFullName[TDB_FILENAME_LEN];
SPage *pPage;
SPgno pgno;
*ppDb = NULL;
pDb = (STDb *)calloc(1, sizeof(*pDb));
if (pDb == NULL) {
return -1;
}
// pDb->pEnv
pDb->pEnv = pEnv;
pPager = tdbEnvGetPager(pEnv, fname);
if (pPager == NULL) {
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->rootDir, fname);
ret = tdbPagerOpen(pEnv->pCache, fFullName, &pPager);
if (ret < 0) {
return -1;
}
}
ASSERT(pPager != NULL);
// pDb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, keyCmprFn, &(pDb->pBt));
if (ret < 0) {
return -1;
}
*ppDb = pDb;
return 0;
}
int tdbDbClose(STDb *pDb) {
// TODO
return 0;
}
int tdbDbDrop(STDb *pDb) {
// TODO
return 0;
}
int tdbDbInsert(STDb *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) {
SBtCursor btc;
SBtCursor *pCur;
int ret;
pCur = &btc;
ret = tdbBtreeCursor(pCur, pDb->pBt);
if (ret < 0) {
return -1;
}
ret = tdbBtCursorInsert(pCur, pKey, keyLen, pVal, valLen);
if (ret < 0) {
return -1;
}
return 0;
}
\ No newline at end of file
...@@ -15,155 +15,56 @@ ...@@ -15,155 +15,56 @@
#include "tdbInt.h" #include "tdbInt.h"
struct STDbEnv { int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, STEnv **ppEnv) {
char * rootDir; // root directory of the environment STEnv *pEnv;
char * jname; // journal file name int dsize;
TdFilePtr jpFile; // journal file fd int zsize;
pgsz_t pgSize; // page size u8 *pPtr;
cachesz_t cacheSize; // total cache size int ret;
STDbList dbList; // TDB List
SPgFileList pgfList; // SPgFile List
SPgCache * pPgCache; // page cache
struct {
#define TDB_ENV_PGF_HASH_BUCKETS 17
SPgFileList buckets[TDB_ENV_PGF_HASH_BUCKETS];
} pgfht; // page file hash table;
};
#define TDB_ENV_PGF_HASH(fileid) (((uint8_t *)(fileid))[0] + ((uint8_t *)(fileid))[1] + ((uint8_t *)(fileid))[2])
static int tdbEnvDestroy(TENV *pEnv);
int tdbEnvCreate(TENV **ppEnv, const char *rootDir) {
TENV * pEnv;
size_t slen;
size_t jlen;
ASSERT(rootDir != NULL);
*ppEnv = NULL; *ppEnv = NULL;
slen = strlen(rootDir);
jlen = slen + strlen(TDB_JOURNAL_NAME) + 1;
pEnv = (TENV *)calloc(1, sizeof(*pEnv) + slen + 1 + jlen + 1);
if (pEnv == NULL) {
return -1;
}
pEnv->rootDir = (char *)(&pEnv[1]);
pEnv->jname = pEnv->rootDir + slen + 1;
pEnv->jpFile = NULL;
pEnv->pgSize = TDB_DEFAULT_PGSIZE;
pEnv->cacheSize = TDB_DEFAULT_CACHE_SIZE;
memcpy(pEnv->rootDir, rootDir, slen); dsize = strlen(rootDir);
pEnv->rootDir[slen] = '\0'; zsize = sizeof(*pEnv) + dsize * 2 + strlen(TDB_JOURNAL_NAME) + 3;
sprintf(pEnv->jname, "%s/%s", rootDir, TDB_JOURNAL_NAME);
TD_DLIST_INIT(&(pEnv->dbList));
TD_DLIST_INIT(&(pEnv->pgfList));
/* TODO */
*ppEnv = pEnv;
return 0;
}
int tdbEnvOpen(TENV *pEnv) {
SPgCache *pPgCache;
int ret;
ASSERT(pEnv != NULL);
/* TODO: here we do not need to create the root directory, more
* work should be done here
*/
mkdir(pEnv->rootDir, 0755);
ret = pgCacheOpen(&pPgCache, pEnv);
if (ret != 0) {
goto _err;
}
pEnv->pPgCache = pPgCache;
return 0;
_err:
return -1;
}
int tdbEnvClose(TENV *pEnv) {
if (pEnv == NULL) return 0;
pgCacheClose(pEnv->pPgCache);
tdbEnvDestroy(pEnv);
return 0;
}
int tdbEnvSetCache(TENV *pEnv, pgsz_t pgSize, cachesz_t cacheSize) { pPtr = (uint8_t *)calloc(1, zsize);
if (!TDB_IS_PGSIZE_VLD(pgSize) || cacheSize / pgSize < 10) { if (pPtr == NULL) {
return -1; return -1;
} }
/* TODO */ pEnv = (STEnv *)pPtr;
pPtr += sizeof(*pEnv);
pEnv->pgSize = pgSize; // pEnv->rootDir
pEnv->cacheSize = cacheSize; pEnv->rootDir = pPtr;
memcpy(pEnv->rootDir, rootDir, dsize);
return 0; pEnv->rootDir[dsize] = '\0';
} pPtr = pPtr + dsize + 1;
// pEnv->jfname
pgsz_t tdbEnvGetPageSize(TENV *pEnv) { return pEnv->pgSize; } pEnv->jfname = pPtr;
memcpy(pEnv->jfname, rootDir, dsize);
cachesz_t tdbEnvGetCacheSize(TENV *pEnv) { return pEnv->cacheSize; } pEnv->jfname[dsize] = '/';
memcpy(pEnv->jfname + dsize + 1, TDB_JOURNAL_NAME, strlen(TDB_JOURNAL_NAME));
SPgFile *tdbEnvGetPageFile(TENV *pEnv, const uint8_t fileid[]) { pEnv->jfname[dsize + 1 + strlen(TDB_JOURNAL_NAME)] = '\0';
SPgFileList *pBucket;
SPgFile * pPgFile; pEnv->jfd = -1;
pBucket = pEnv->pgfht.buckets + (TDB_ENV_PGF_HASH(fileid) % TDB_ENV_PGF_HASH_BUCKETS); // TODO ret = tdbPCacheOpen(pageSize, cacheSize, &(pEnv->pCache));
for (pPgFile = TD_DLIST_HEAD(pBucket); pPgFile != NULL; pPgFile = TD_DLIST_NODE_NEXT_WITH_FIELD(pPgFile, envHash)) { if (ret < 0) {
if (memcmp(fileid, pPgFile->fileid, TDB_FILE_ID_LEN) == 0) break;
};
return pPgFile;
}
SPgCache *tdbEnvGetPgCache(TENV *pEnv) { return pEnv->pPgCache; }
static int tdbEnvDestroy(TENV *pEnv) {
// TODO
return 0;
}
int tdbEnvBeginTxn(TENV *pEnv) {
pEnv->jpFile = taosOpenFile(pEnv->jname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ);
if (pEnv->jpFile == NULL) {
return -1; return -1;
} }
return 0; mkdir(rootDir, 0755);
}
int tdbEnvCommit(TENV *pEnv) { *ppEnv = pEnv;
/* TODO */
taosCloseFile(&pEnv->jpFile);
pEnv->jpFile = NULL;
return 0; return 0;
} }
const char *tdbEnvGetRootDir(TENV *pEnv) { return pEnv->rootDir; } int tdbEnvClose(STEnv *pEnv) {
// TODO
int tdbEnvRgstPageFile(TENV *pEnv, SPgFile *pPgFile) {
SPgFileList *pBucket;
TD_DLIST_APPEND_WITH_FIELD(&(pEnv->pgfList), pPgFile, envPgfList);
pBucket = pEnv->pgfht.buckets + (TDB_ENV_PGF_HASH(pPgFile->fileid) % TDB_ENV_PGF_HASH_BUCKETS); // TODO
TD_DLIST_APPEND_WITH_FIELD(pBucket, pPgFile, envHash);
return 0; return 0;
} }
int tdbEnvRgstDB(TENV *pEnv, TDB *pDb) { SPager *tdbEnvGetPager(STEnv *pEnv, const char *fname) {
// TODO // TODO
return 0; return NULL;
} }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct SPCache {
int pageSize;
int cacheSize;
pthread_mutex_t mutex;
int nFree;
SPage *pFree;
int nPage;
int nHash;
SPage **pgHash;
int nRecyclable;
SPage lru;
};
#define PCACHE_PAGE_HASH(pPgid) \
({ \
u32 *t = (u32 *)((pPgid)->fileid); \
t[0] + t[1] + t[2] + t[3] + t[4] + t[5] + (pPgid)->pgno; \
})
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
// For page ref
#define TDB_INIT_PAGE_REF(pPage) ((pPage)->nRef = 0)
#if 0
#define TDB_REF_PAGE(pPage) (++(pPage)->nRef)
#define TDB_UNREF_PAGE(pPage) (--(pPage)->nRef)
#define TDB_GET_PAGE_REF(pPage) ((pPage)->nRef)
#else
#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1)
#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1)
#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef))
#endif
static int tdbPCacheOpenImpl(SPCache *pCache);
static void tdbPCacheInitLock(SPCache *pCache);
static void tdbPCacheClearLock(SPCache *pCache);
static void tdbPCacheLock(SPCache *pCache);
static void tdbPCacheUnlock(SPCache *pCache);
static bool tdbPCacheLocked(SPCache *pCache);
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNewPage);
static void tdbPCachePinPage(SPage *pPage);
static void tdbPCacheRemovePageFromHash(SPage *pPage);
static void tdbPCacheAddPageToHash(SPage *pPage);
static void tdbPCacheUnpinPage(SPage *pPage);
static void *tdbOsMalloc(void *arg, size_t size);
static void tdbOsFree(void *arg, void *ptr);
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
SPCache *pCache;
void *pPtr;
SPage *pPgHdr;
pCache = (SPCache *)calloc(1, sizeof(*pCache));
if (pCache == NULL) {
return -1;
}
pCache->pageSize = pageSize;
pCache->cacheSize = cacheSize;
if (tdbPCacheOpenImpl(pCache) < 0) {
free(pCache);
return -1;
}
*ppCache = pCache;
return 0;
}
int tdbPCacheClose(SPCache *pCache) {
/* TODO */
return 0;
}
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage) {
SPage *pPage;
tdbPCacheLock(pCache);
pPage = tdbPCacheFetchImpl(pCache, pPgid, alcNewPage);
if (pPage) {
TDB_REF_PAGE(pPage);
}
tdbPCacheUnlock(pCache);
return pPage;
}
void tdbPCacheRelease(SPage *pPage) {
i32 nRef;
nRef = TDB_UNREF_PAGE(pPage);
ASSERT(nRef >= 0);
if (nRef == 0) {
if (1 /*TODO: page still clean*/) {
tdbPCacheUnpinPage(pPage);
} else {
// TODO
ASSERT(0);
}
}
}
static void tdbPCacheInitLock(SPCache *pCache) { pthread_mutex_init(&(pCache->mutex), NULL); }
static void tdbPCacheClearLock(SPCache *pCache) { pthread_mutex_destroy(&(pCache->mutex)); }
static void tdbPCacheLock(SPCache *pCache) { pthread_mutex_lock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { pthread_mutex_unlock(&(pCache->mutex)); }
static bool tdbPCacheLocked(SPCache *pCache) {
assert(0);
// TODO
return true;
}
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNewPage) {
SPage *pPage;
// 1. Search the hash table
pPage = pCache->pgHash[PCACHE_PAGE_HASH(pPgid) % pCache->nHash];
while (pPage) {
if (TDB_IS_SAME_PAGE(&(pPage->pgid), pPgid)) break;
pPage = pPage->pHashNext;
}
if (pPage || !alcNewPage) {
if (pPage) {
tdbPCachePinPage(pPage);
}
return pPage;
}
// 2. Try to allocate a new page from the free list
if (pCache->pFree) {
pPage = pCache->pFree;
pCache->pFree = pPage->pFreeNext;
pCache->nFree--;
pPage->pLruNext = NULL;
}
// 3. Try to Recycle a page
if (!pPage && !pCache->lru.pLruPrev->isAnchor) {
pPage = pCache->lru.pLruPrev;
tdbPCacheRemovePageFromHash(pPage);
tdbPCachePinPage(pPage);
}
// 4. Try a stress allocation (TODO)
// 5. Page here are just created from a free list
// or by recycling or allocated streesly,
// need to initialize it
if (pPage) {
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
pPage->pLruNext = NULL;
pPage->pPager = NULL;
tdbPCacheAddPageToHash(pPage);
}
return pPage;
}
static void tdbPCachePinPage(SPage *pPage) {
SPCache *pCache;
pCache = pPage->pCache;
if (!PAGE_IS_PINNED(pPage)) {
pPage->pLruPrev->pLruNext = pPage->pLruNext;
pPage->pLruNext->pLruPrev = pPage->pLruPrev;
pPage->pLruNext = NULL;
pCache->nRecyclable--;
}
}
static void tdbPCacheUnpinPage(SPage *pPage) {
SPCache *pCache;
i32 nRef;
pCache = pPage->pCache;
tdbPCacheLock(pCache);
nRef = TDB_GET_PAGE_REF(pPage);
ASSERT(nRef >= 0);
if (nRef == 0) {
// Add the page to LRU list
ASSERT(pPage->pLruNext == NULL);
pPage->pLruPrev = &(pCache->lru);
pPage->pLruNext = pCache->lru.pLruNext;
pCache->lru.pLruNext->pLruPrev = pPage;
pCache->lru.pLruNext = pPage;
}
pCache->nRecyclable++;
tdbPCacheUnlock(pCache);
}
static void tdbPCacheRemovePageFromHash(SPage *pPage) {
SPCache *pCache;
SPage **ppPage;
int h;
pCache = pPage->pCache;
h = PCACHE_PAGE_HASH(&(pPage->pgid));
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
;
ASSERT(*ppPage == pPage);
*ppPage = pPage->pHashNext;
pCache->nPage--;
}
static void tdbPCacheAddPageToHash(SPage *pPage) {
SPCache *pCache;
int h;
pCache = pPage->pCache;
h = PCACHE_PAGE_HASH(&(pPage->pgid)) % pCache->nHash;
pPage->pHashNext = pCache->pgHash[h];
pCache->pgHash[h] = pPage;
pCache->nPage++;
}
static int tdbPCacheOpenImpl(SPCache *pCache) {
SPage *pPage;
u8 *pPtr;
int tsize;
int ret;
tdbPCacheInitLock(pCache);
// Open the free list
pCache->nFree = 0;
pCache->pFree = NULL;
for (int i = 0; i < pCache->cacheSize; i++) {
ret = tdbPageCreate(pCache->pageSize, &pPage, tdbOsMalloc, NULL);
if (ret < 0) {
// TODO: handle error
return -1;
}
// pPage->pgid = 0;
pPage->isAnchor = 0;
pPage->isLocalPage = 1;
pPage->pCache = pCache;
TDB_INIT_PAGE_REF(pPage);
pPage->pHashNext = NULL;
pPage->pLruNext = NULL;
pPage->pLruPrev = NULL;
pPage->pDirtyNext = NULL;
pPage->pFreeNext = pCache->pFree;
pCache->pFree = pPage;
pCache->nFree++;
}
// Open the hash table
pCache->nPage = 0;
pCache->nHash = pCache->cacheSize;
pCache->pgHash = (SPage **)calloc(pCache->nHash, sizeof(SPage *));
if (pCache->pgHash == NULL) {
// TODO
return -1;
}
// Open LRU list
pCache->nRecyclable = 0;
pCache->lru.isAnchor = 1;
pCache->lru.pLruNext = &(pCache->lru);
pCache->lru.pLruPrev = &(pCache->lru);
return 0;
}
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; }
static void *tdbOsMalloc(void *arg, size_t size) {
void *ptr;
ptr = malloc(size);
return ptr;
}
static void tdbOsFree(void *arg, void *ptr) { free(ptr); }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
typedef struct __attribute__((__packed__)) {
u8 szCell[2];
u8 nxOffset[2];
} SFreeCell;
typedef struct __attribute__((__packed__)) {
u8 szCell[3];
u8 nxOffset[3];
} SFreeCellL;
/* For small page */
#define TDB_SPAGE_FREE_CELL_SIZE_PTR(PCELL) (((SFreeCell *)(PCELL))->szCell)
#define TDB_SPAGE_FREE_CELL_NXOFFSET_PTR(PCELL) (((SFreeCell *)(PCELL))->nxOffset)
#define TDB_SPAGE_FREE_CELL_SIZE(PCELL) ((u16 *)TDB_SPAGE_FREE_CELL_SIZE_PTR(PCELL))[0]
#define TDB_SPAGE_FREE_CELL_NXOFFSET(PCELL) ((u16 *)TDB_SPAGE_FREE_CELL_NXOFFSET_PTR(PCELL))[0]
#define TDB_SPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE) (TDB_SPAGE_FREE_CELL_SIZE(PCELL) = (SIZE))
#define TDB_SPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET) (TDB_SPAGE_FREE_CELL_NXOFFSET(PCELL) = (OFFSET))
/* For large page */
#define TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL) (((SFreeCellL *)(PCELL))->szCell)
#define TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL) (((SFreeCellL *)(PCELL))->nxOffset)
#define TDB_LPAGE_FREE_CELL_SIZE(PCELL) TDB_GET_U24(TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL))
#define TDB_LPAGE_FREE_CELL_NXOFFSET(PCELL) TDB_GET_U24(TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL))
#define TDB_LPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE) TDB_PUT_U24(TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL), SIZE)
#define TDB_LPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET) TDB_PUT_U24(TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL), OFFSET)
/* For page */
#define TDB_PAGE_FREE_CELL_SIZE_PTR(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_SIZE_PTR(PCELL) : TDB_SPAGE_FREE_CELL_SIZE_PTR(PCELL))
#define TDB_PAGE_FREE_CELL_NXOFFSET_PTR(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_NXOFFSET_PTR(PCELL) : TDB_SPAGE_FREE_CELL_NXOFFSET_PTR(PCELL))
#define TDB_PAGE_FREE_CELL_SIZE(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_SIZE(PCELL) : TDB_SPAGE_FREE_CELL_SIZE(PCELL))
#define TDB_PAGE_FREE_CELL_NXOFFSET(PPAGE, PCELL) \
(TDB_IS_LARGE_PAGE(pPage) ? TDB_LPAGE_FREE_CELL_NXOFFSET(PCELL) : TDB_SPAGE_FREE_CELL_NXOFFSET(PCELL))
#define TDB_PAGE_FREE_CELL_SIZE_SET(PPAGE, PCELL, SIZE) \
do { \
if (TDB_IS_LARGE_PAGE(PPAGE)) { \
TDB_LPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE); \
} else { \
TDB_SPAGE_FREE_CELL_SIZE_SET(PCELL, SIZE); \
} \
} while (0)
#define TDB_PAGE_FREE_CELL_NXOFFSET_SET(PPAGE, PCELL, OFFSET) \
do { \
if (TDB_IS_LARGE_PAGE(PPAGE)) { \
TDB_LPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET); \
} else { \
TDB_SPAGE_FREE_CELL_NXOFFSET_SET(PCELL, OFFSET); \
} \
} while (0)
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell);
static int tdbPageDefragment(SPage *pPage);
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg) {
SPage *pPage;
u8 *ptr;
int size;
ASSERT(TDB_IS_PGSIZE_VLD(pageSize));
*ppPage = NULL;
size = pageSize + sizeof(*pPage);
ptr = (u8 *)((*xMalloc)(arg, size));
if (pPage == NULL) {
return -1;
}
memset(ptr, 0, size);
pPage = (SPage *)(ptr + pageSize);
pPage->pData = ptr;
pPage->pageSize = pageSize;
if (pageSize < 65536) {
pPage->szOffset = 2;
pPage->szPageHdr = sizeof(SPageHdr);
pPage->szFreeCell = sizeof(SFreeCell);
} else {
pPage->szOffset = 3;
pPage->szPageHdr = sizeof(SPageHdrL);
pPage->szFreeCell = sizeof(SFreeCellL);
}
TDB_INIT_PAGE_LOCK(pPage);
/* TODO */
*ppPage = pPage;
return 0;
}
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
u8 *ptr;
ptr = pPage->pData;
(*xFree)(arg, ptr);
return 0;
}
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
int ret;
SCell *pTarget;
u8 *pTmp;
int j;
if (pPage->nOverflow || szCell + pPage->szOffset > pPage->nFree) {
// TODO: need to figure out if pCell may be used by outside of this function
j = pPage->nOverflow++;
pPage->apOvfl[j] = pCell;
pPage->aiOvfl[j] = idx;
} else {
ret = tdbPageAllocate(pPage, szCell, &pTarget);
if (ret < 0) {
return -1;
}
memcpy(pTarget, pCell, szCell);
pTmp = pPage->pCellIdx + idx * pPage->szOffset;
memmove(pTmp + pPage->szOffset, pTmp, pPage->pFreeStart - pTmp - pPage->szOffset);
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pTarget - pPage->pData);
TDB_PAGE_NCELLS_SET(pPage, TDB_PAGE_NCELLS(pPage) + 1);
}
return 0;
}
int tdbPageDropCell(SPage *pPage, int idx) {
// TODO
return 0;
}
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell) {
SCell *pCell;
SFreeCell *pFreeCell;
u8 *pOffset;
int ret;
ASSERT(pPage->nFree > size + pPage->szOffset);
pCell = NULL;
*ppCell = NULL;
// 1. Try to allocate from the free space area
if (pPage->pFreeEnd - pPage->pFreeStart > size + pPage->szOffset) {
pPage->pFreeEnd -= size;
pPage->pFreeStart += pPage->szOffset;
pCell = pPage->pFreeEnd;
}
// 2. Try to allocate from the page free list
if ((pCell == NULL) && (pPage->pFreeEnd - pPage->pFreeStart >= pPage->szOffset) && TDB_PAGE_FCELL(pPage)) {
int szCell;
int nxOffset;
pCell = pPage->pData + TDB_PAGE_FCELL(pPage);
pOffset = TDB_IS_LARGE_PAGE(pPage) ? ((SPageHdrL *)(pPage->pPageHdr))[0].fCell
: (u8 *)&(((SPageHdr *)(pPage->pPageHdr))[0].fCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
for (;;) {
// Find a cell
if (szCell >= size) {
if (szCell - size >= pPage->szFreeCell) {
SCell *pTmpCell = pCell + size;
TDB_PAGE_FREE_CELL_SIZE_SET(pPage, pTmpCell, szCell - size);
TDB_PAGE_FREE_CELL_NXOFFSET_SET(pPage, pTmpCell, nxOffset);
// TODO: *pOffset = pTmpCell - pPage->pData;
} else {
TDB_PAGE_NFREE_SET(pPage, TDB_PAGE_NFREE(pPage) + szCell - size);
// TODO: *pOffset = nxOffset;
}
break;
}
// Not find a cell yet
if (nxOffset > 0) {
pCell = pPage->pData + nxOffset;
pOffset = TDB_PAGE_FREE_CELL_NXOFFSET_PTR(pPage, pCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
continue;
} else {
pCell = NULL;
break;
}
}
if (pCell) {
pPage->pFreeStart = pPage->pFreeStart + pPage->szOffset;
}
}
// 3. Try to dfragment and allocate again
if (pCell == NULL) {
ret = tdbPageDefragment(pPage);
if (ret < 0) {
return -1;
}
ASSERT(pPage->pFreeEnd - pPage->pFreeStart > size + pPage->szOffset);
ASSERT(pPage->nFree == pPage->pFreeEnd - pPage->pFreeStart);
// Allocate from the free space area again
pPage->pFreeEnd -= size;
pPage->pFreeStart += pPage->szOffset;
pCell = pPage->pFreeEnd;
}
ASSERT(pCell != NULL);
pPage->nFree = pPage->nFree - size - pPage->szOffset;
*ppCell = pCell;
return 0;
}
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int size) {
// TODO
return 0;
}
static int tdbPageDefragment(SPage *pPage) {
// TODO
ASSERT(0);
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct SPager {
char *dbFileName;
char *jFileName;
int pageSize;
uint8_t fid[TDB_FILE_ID_LEN];
int fd;
int jfd;
SPCache *pCache;
SPgno dbFileSize;
SPgno dbOrigSize;
int nDirty;
SPage *pDirty;
SPage *pDirtyTail;
u8 inTran;
};
typedef struct __attribute__((__packed__)) {
u8 hdrString[16];
u16 pageSize;
SPgno freePage;
u32 nFreePages;
u8 reserved[102];
} SFileHdr;
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static int tdbPagerReadPage(SPager *pPager, SPage *pPage);
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg);
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
uint8_t *pPtr;
SPager *pPager;
int fsize;
int zsize;
int ret;
*ppPager = NULL;
fsize = strlen(fileName);
zsize = sizeof(*pPager) /* SPager */
+ fsize + 1 /* dbFileName */
+ fsize + 8 + 1; /* jFileName */
pPtr = (uint8_t *)calloc(1, zsize);
if (pPtr == NULL) {
return -1;
}
pPager = (SPager *)pPtr;
pPtr += sizeof(*pPager);
// pPager->dbFileName
pPager->dbFileName = (char *)pPtr;
memcpy(pPager->dbFileName, fileName, fsize);
pPager->dbFileName[fsize] = '\0';
pPtr += fsize + 1;
// pPager->jFileName
pPager->jFileName = (char *)pPtr;
memcpy(pPager->jFileName, fileName, fsize);
memcpy(pPager->jFileName + fsize, "-journal", 8);
pPager->jFileName[fsize + 8] = '\0';
// pPager->pCache
pPager->pCache = pCache;
pPager->fd = open(pPager->dbFileName, O_RDWR | O_CREAT, 0755);
if (pPager->fd < 0) {
return -1;
}
ret = tdbGnrtFileID(pPager->dbFileName, pPager->fid, false);
if (ret < 0) {
return -1;
}
pPager->jfd = -1;
pPager->pageSize = tdbPCacheGetPageSize(pCache);
*ppPager = pPager;
return 0;
}
int tdbPagerClose(SPager *pPager) {
// TODO
return 0;
}
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
SPgno pgno;
SPage *pPage;
int ret;
{
// TODO: try to search the main DB to get the page number
pgno = 0;
}
// if (pgno == 0 && toCreate) {
// ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
// if (ret < 0) {
// return -1;
// }
// // TODO: Need to zero the page
// ret = tdbPagerWrite(pPager, pPage);
// if (ret < 0) {
// return -1;
// }
// }
*ppgno = pgno;
return 0;
}
int tdbPagerWrite(SPager *pPager, SPage *pPage) {
int ret;
if (pPager->inTran == 0) {
ret = tdbPagerBegin(pPager);
if (ret < 0) {
return -1;
}
}
if (pPage->isDirty == 0) {
pPage->isDirty = 1;
// TODO: add the page to the dirty list
// TODO: write the page to the journal
if (1 /*actually load from the file*/) {
}
}
return 0;
}
int tdbPagerBegin(SPager *pPager) {
if (pPager->inTran) {
return 0;
}
// Open the journal
pPager->jfd = open(pPager->jFileName, O_RDWR | O_CREAT, 0755);
if (pPager->jfd < 0) {
return -1;
}
// TODO: write the size of the file
pPager->inTran = 1;
return 0;
}
int tdbPagerCommit(SPager *pPager) {
// TODO
return 0;
}
static int tdbPagerReadPage(SPager *pPager, SPage *pPage) {
i64 offset;
int ret;
ASSERT(memcmp(pPager->fid, pPage->pgid.fileid, TDB_FILE_ID_LEN) == 0);
offset = (pPage->pgid.pgno - 1) * (i64)(pPager->pageSize);
ret = tdbPRead(pPager->fd, pPage->pData, pPager->pageSize, offset);
if (ret < 0) {
// TODO: handle error
return -1;
}
return 0;
}
int tdbPagerGetPageSize(SPager *pPager) { return pPager->pageSize; }
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) {
SPage *pPage;
SPgid pgid;
int ret;
// Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1);
if (pPage == NULL) {
return -1;
}
// Initialize the page if need
if (!TDB_PAGE_INITIALIZED(pPage)) {
ret = tdbPagerInitPage(pPager, pPage, initPage, arg);
if (ret < 0) {
return -1;
}
}
ASSERT(TDB_PAGE_INITIALIZED(pPage));
ASSERT(pPage->pPager == pPager);
*ppPage = pPage;
return 0;
}
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) {
int ret;
SPage *pPage;
SPgid pgid;
// Allocate a page number
ret = tdbPagerAllocPage(pPager, ppgno);
if (ret < 0) {
return -1;
}
ASSERT(*ppgno != 0);
// Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = *ppgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1);
if (pPage == NULL) {
return -1;
}
ASSERT(!TDB_PAGE_INITIALIZED(pPage));
// Initialize the page if need
ret = tdbPagerInitPage(pPager, pPage, initPage, arg);
if (ret < 0) {
return -1;
}
ASSERT(TDB_PAGE_INITIALIZED(pPage));
ASSERT(pPage->pPager == pPager);
*ppPage = pPage;
return 0;
}
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
// TODO: Allocate a page from the free list
return 0;
}
static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
*ppgno = ++pPager->dbFileSize;
return 0;
}
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
int ret;
*ppgno = 0;
// Try to allocate from the free list of the pager
ret = tdbPagerAllocFreePage(pPager, ppgno);
if (ret < 0) {
return -1;
}
if (*ppgno != 0) return 0;
// Allocate the page by extending the pager
ret = tdbPagerAllocNewPage(pPager, ppgno);
if (ret < 0) {
return -1;
}
ASSERT(*ppgno != 0);
return 0;
}
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg) {
int ret;
int lcode;
int nLoops;
lcode = TDB_TRY_LOCK_PAGE(pPage);
if (lcode == P_LOCK_SUCC) {
if (TDB_PAGE_INITIALIZED(pPage)) {
TDB_UNLOCK_PAGE(pPage);
return 0;
}
ret = (*initPage)(pPage, arg);
if (ret < 0) {
TDB_UNLOCK_PAGE(pPage);
return -1;
}
pPage->pPager = pPager;
TDB_UNLOCK_PAGE(pPage);
} else if (lcode == P_LOCK_BUSY) {
nLoops = 0;
for (;;) {
if (TDB_PAGE_INITIALIZED(pPage)) break;
nLoops++;
if (nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
} else {
return -1;
}
return 0;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
typedef TD_DLIST(SPage) SPgList;
struct SPgCache {
TENV * pEnv; // TENV containing this page cache
pgsz_t pgsize;
int32_t npage;
SPage **pages;
SPgList freeList;
SPgList lru;
struct {
int32_t nbucket;
SPgList *buckets;
} pght; // page hash table
};
static void pgCachePinPage(SPage *pPage);
static void pgCacheUnpinPage(SPage *pPage);
int pgCacheOpen(SPgCache **ppPgCache, TENV *pEnv) {
SPgCache *pPgCache;
SPage * pPage;
void * pData;
pgsz_t pgSize;
cachesz_t cacheSize;
int32_t npage;
int32_t nbucket;
size_t msize;
*ppPgCache = NULL;
pgSize = tdbEnvGetPageSize(pEnv);
cacheSize = tdbEnvGetCacheSize(pEnv);
npage = cacheSize / pgSize;
nbucket = npage;
msize = sizeof(*pPgCache) + sizeof(SPage *) * npage + sizeof(SPgList) * nbucket;
// Allocate the handle
pPgCache = (SPgCache *)calloc(1, msize);
if (pPgCache == NULL) {
return -1;
}
// Init the handle
pPgCache->pEnv = pEnv;
pPgCache->pgsize = pgSize;
pPgCache->npage = npage;
pPgCache->pages = (SPage **)(&pPgCache[1]);
pPgCache->pght.nbucket = nbucket;
pPgCache->pght.buckets = (SPgList *)(&(pPgCache->pages[npage]));
TD_DLIST_INIT(&(pPgCache->freeList));
for (int32_t i = 0; i < npage; i++) {
pData = malloc(pgSize + sizeof(SPage));
if (pData == NULL) {
return -1;
// TODO: handle error
}
pPage = POINTER_SHIFT(pData, pgSize);
pPage->pgid = TDB_IVLD_PGID;
pPage->frameid = i;
pPage->pData = pData;
// add current page to the page cache
pPgCache->pages[i] = pPage;
TD_DLIST_APPEND_WITH_FIELD(&(pPgCache->freeList), pPage, freeNode);
}
#if 0
for (int32_t i = 0; i < nbucket; i++) {
TD_DLIST_INIT(pPgCache->pght.buckets + i);
}
#endif
*ppPgCache = pPgCache;
return 0;
}
int pgCacheClose(SPgCache *pPgCache) {
SPage *pPage;
if (pPgCache) {
for (int32_t i = 0; i < pPgCache->npage; i++) {
pPage = pPgCache->pages[i];
tfree(pPage->pData);
}
free(pPgCache);
}
return 0;
}
#define PG_CACHE_HASH(fileid, pgno) (((uint64_t *)(fileid))[0] + ((uint64_t *)(fileid))[1] + ((uint64_t *)(fileid))[2] + (pgno))
SPage *pgCacheFetch(SPgCache *pPgCache, pgid_t pgid) {
SPage * pPage;
SPgFile *pPgFile;
SPgList *pBucket;
// 1. Search the page hash table SPgCache.pght
pBucket = pPgCache->pght.buckets + (PG_CACHE_HASH(pgid.fileid, pgid.pgno) % pPgCache->pght.nbucket);
pPage = TD_DLIST_HEAD(pBucket);
while (pPage && tdbCmprPgId(&(pPage->pgid), &pgid)) {
pPage = TD_DLIST_NODE_NEXT_WITH_FIELD(pPage, pghtNode);
}
if (pPage) {
// Page is found, pin the page and return the page
pgCachePinPage(pPage);
return pPage;
}
// 2. Check the free list
pPage = TD_DLIST_HEAD(&(pPgCache->freeList));
if (pPage) {
TD_DLIST_POP_WITH_FIELD(&(pPgCache->freeList), pPage, freeNode);
pgCachePinPage(pPage);
return pPage;
}
// 3. Try to recycle a page from the LRU list
pPage = TD_DLIST_HEAD(&(pPgCache->lru));
if (pPage) {
TD_DLIST_POP_WITH_FIELD(&(pPgCache->lru), pPage, lruNode);
// TODO: remove from the hash table
pgCachePinPage(pPage);
return pPage;
}
// 4. If a memory allocator is set, try to allocate from the allocator (TODO)
return NULL;
}
int pgCacheRelease(SPage *pPage) {
// TODO
return 0;
}
static void pgCachePinPage(SPage *pPage) {
// TODO
}
static void pgCacheUnpinPage(SPage *pPage) {
// TODO
}
#if 0
// Exposed handle
typedef struct TDB_MPOOL TDB_MPOOL;
typedef struct TDB_MPFILE TDB_MPFILE;
typedef TD_DLIST_NODE(pg_t) pg_free_dlist_node_t, pg_hash_dlist_node_t;
typedef struct pg_t {
SRWLatch rwLatch;
frame_id_t frameid;
pgid_t pgid;
uint8_t dirty;
uint8_t rbit;
int32_t pinRef;
pg_free_dlist_node_t free;
pg_hash_dlist_node_t hash;
void * p;
} pg_t;
typedef TD_DLIST(pg_t) pg_list_t;
typedef struct {
SRWLatch latch;
TD_DLIST(TDB_MPFILE);
} mpf_bucket_t;
struct TDB_MPOOL {
int64_t cachesize;
pgsz_t pgsize;
int32_t npages;
pg_t * pages;
pg_list_t freeList;
frame_id_t clockHand;
struct {
int32_t nbucket;
pg_list_t *hashtab;
} pgtab; // page table, hash<pgid_t, pg_t>
struct {
#define MPF_HASH_BUCKETS 16
mpf_bucket_t buckets[MPF_HASH_BUCKETS];
} mpfht; // MPF hash table. MPFs using this MP will be put in this hash table
};
#define MP_PAGE_AT(mp, idx) (mp)->pages[idx]
typedef TD_DLIST_NODE(TDB_MPFILE) td_mpf_dlist_node_t;
struct TDB_MPFILE {
char * fname; // file name
int fd; // fd
uint8_t fileid[TDB_FILE_ID_LEN]; // file ID
TDB_MPOOL * mp; // underlying memory pool
td_mpf_dlist_node_t node;
};
/*=================================================== Exposed apis ==================================================*/
// TDB_MPOOL
int tdbMPoolOpen(TDB_MPOOL **mpp, uint64_t cachesize, pgsz_t pgsize);
int tdbMPoolClose(TDB_MPOOL *mp);
int tdbMPoolSync(TDB_MPOOL *mp);
// TDB_MPFILE
int tdbMPoolFileOpen(TDB_MPFILE **mpfp, const char *fname, TDB_MPOOL *mp);
int tdbMPoolFileClose(TDB_MPFILE *mpf);
int tdbMPoolFileNewPage(TDB_MPFILE *mpf, pgno_t *pgno, void *addr);
int tdbMPoolFileFreePage(TDB_MPOOL *mpf, pgno_t *pgno, void *addr);
int tdbMPoolFileGetPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr);
int tdbMPoolFilePutPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr);
int tdbMPoolFileSync(TDB_MPFILE *mpf);
static void tdbMPoolRegFile(TDB_MPOOL *mp, TDB_MPFILE *mpf);
static void tdbMPoolUnregFile(TDB_MPOOL *mp, TDB_MPFILE *mpf);
static TDB_MPFILE *tdbMPoolGetFile(TDB_MPOOL *mp, uint8_t *fileid);
static int tdbMPoolFileReadPage(TDB_MPFILE *mpf, pgno_t pgno, void *p);
static int tdbMPoolFileWritePage(TDB_MPFILE *mpf, pgno_t pgno, const void *p);
static void tdbMPoolClockEvictPage(TDB_MPOOL *mp, pg_t **pagepp);
int tdbMPoolOpen(TDB_MPOOL **mpp, uint64_t cachesize, pgsz_t pgsize) {
TDB_MPOOL *mp = NULL;
size_t tsize;
pg_t * pagep;
// check parameters
if (!TDB_IS_PGSIZE_VLD(pgsize)) {
tdbError("invalid page size");
return -1;
}
// allocate handle
mp = (TDB_MPOOL *)calloc(1, sizeof(*mp));
if (mp == NULL) {
tdbError("failed to malloc memory pool handle");
goto _err;
}
// initialize the handle
mp->cachesize = cachesize;
mp->pgsize = pgsize;
mp->npages = cachesize / pgsize;
mp->clockHand = 0;
TD_DLIST_INIT(&mp->freeList);
mp->pages = (pg_t *)calloc(mp->npages, sizeof(pg_t));
if (mp->pages == NULL) {
tdbError("failed to malloc memory pool pages");
goto _err;
}
for (frame_id_t i = 0; i < mp->npages; i++) {
mp->pages[i].p = malloc(pgsize);
if (mp->pages[i].p == NULL) {
goto _err;
}
taosInitRWLatch(&mp->pages[i].rwLatch);
mp->pages[i].frameid = i;
mp->pages[i].pgid = TDB_IVLD_PGID;
// add new page to the free list
TD_DLIST_APPEND_WITH_FIELD(&(mp->freeList), &(mp->pages[i]), free);
}
#define PGTAB_FACTOR 1.0
mp->pgtab.nbucket = mp->npages / PGTAB_FACTOR;
mp->pgtab.hashtab = (pg_list_t *)calloc(mp->pgtab.nbucket, sizeof(pg_list_t));
if (mp->pgtab.hashtab == NULL) {
tdbError("failed to malloc memory pool hash table");
goto _err;
}
// return
*mpp = mp;
return 0;
_err:
tdbMPoolClose(mp);
*mpp = NULL;
return -1;
}
int tdbMPoolClose(TDB_MPOOL *mp) {
if (mp) {
tfree(mp->pgtab.hashtab);
if (mp->pages) {
for (int i = 0; i < mp->npages; i++) {
tfree(mp->pages[i].p);
}
free(mp->pages);
}
free(mp);
}
return 0;
}
int tdbMPoolFileOpen(TDB_MPFILE **mpfp, const char *fname, TDB_MPOOL *mp) {
TDB_MPFILE *mpf;
if ((mpf = (TDB_MPFILE *)calloc(1, sizeof(*mpf))) == NULL) {
return -1;
}
mpf->fd = -1;
if ((mpf->fname = strdup(fname)) == NULL) {
goto _err;
}
if ((mpf->fd = open(fname, O_CREAT | O_RDWR, 0755)) < 0) {
goto _err;
}
if (tdbGnrtFileID(fname, mpf->fileid, false) < 0) {
goto _err;
}
// Register current MPF to MP
tdbMPoolRegFile(mp, mpf);
*mpfp = mpf;
return 0;
_err:
tdbMPoolFileClose(mpf);
*mpfp = NULL;
return -1;
}
int tdbMPoolFileClose(TDB_MPFILE *mpf) {
if (mpf) {
if (mpf->fd > 0) {
close(mpf->fd);
}
tfree(mpf->fname);
free(mpf);
}
return 0;
}
#define MPF_GET_PAGE_BUCKETID(fileid, pgno, nbuckets) \
({ \
uint64_t *tmp = (uint64_t *)fileid; \
(tmp[0] + tmp[1] + tmp[2] + (pgno)) % (nbuckets); \
})
int tdbMPoolFileNewPage(TDB_MPFILE *mpf, pgno_t *pgno, void *addr) {
// TODO
return 0;
}
int tdbMPoolFileFreePage(TDB_MPOOL *mpf, pgno_t *pgno, void *addr) {
// TODO
return 0;
}
int tdbMPoolFileGetPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr) {
pg_t * pagep;
TDB_MPOOL *mp;
pg_list_t *pglist;
mp = mpf->mp;
// check if the page already in pool
pglist = mp->pgtab.hashtab + MPF_GET_PAGE_BUCKETID(mpf->fileid, pgno, mp->pgtab.nbucket);
pagep = TD_DLIST_HEAD(pglist);
while (pagep) {
if (memcmp(mpf->fileid, pagep->pgid.fileid, TDB_FILE_ID_LEN) == 0 && pgno == pagep->pgid.pgno) {
break;
}
pagep = TD_DLIST_NODE_NEXT_WITH_FIELD(pagep, hash);
}
if (pagep) {
// page is found
// todo: pin the page and return
*(void **)addr = pagep->p;
return 0;
}
// page not found
pagep = TD_DLIST_HEAD(&mp->freeList);
if (pagep) {
// has free page
TD_DLIST_POP_WITH_FIELD(&(mp->freeList), pagep, free);
} else {
// no free page available
tdbMPoolClockEvictPage(mp, &pagep);
if (pagep) {
if (pagep->dirty) {
// TODO: Handle dirty page eviction
}
}
}
if (pagep == NULL) {
// no available container page
return -1;
}
// load page from the disk if a container page is available
// TODO: load the page from the disk
if (tdbMPoolFileReadPage(mpf, pgno, pagep->p) < 0) {
return -1;
}
memcpy(pagep->pgid.fileid, mpf->fileid, TDB_FILE_ID_LEN);
pagep->pgid.pgno = pgno;
pagep->dirty = 0;
pagep->pinRef = 1;
// add current page to page table
TD_DLIST_APPEND_WITH_FIELD(pglist, pagep, hash);
return 0;
}
int tdbMPoolFilePutPage(TDB_MPFILE *mpf, pgno_t pgno, void *addr) {
// TODO
return 0;
}
#define MPF_GET_BUCKETID(fileid) \
({ \
uint64_t *tmp = (uint64_t *)fileid; \
(tmp[0] + tmp[1] + tmp[2]) % MPF_HASH_BUCKETS; \
})
static void tdbMPoolRegFile(TDB_MPOOL *mp, TDB_MPFILE *mpf) {
mpf_bucket_t *bktp;
bktp = mp->mpfht.buckets + MPF_GET_BUCKETID(mpf->fileid);
taosWLockLatch(&(bktp->latch));
TD_DLIST_APPEND_WITH_FIELD(bktp, mpf, node);
taosWUnLockLatch(&(bktp->latch));
mpf->mp = mp;
}
static TDB_MPFILE *tdbMPoolGetFile(TDB_MPOOL *mp, uint8_t *fileid) {
TDB_MPFILE * mpf = NULL;
mpf_bucket_t *bktp;
bktp = mp->mpfht.buckets + MPF_GET_BUCKETID(fileid);
taosRLockLatch(&(bktp->latch));
mpf = TD_DLIST_HEAD(bktp);
while (mpf) {
if (memcmp(fileid, mpf->fileid, TDB_FILE_ID_LEN) == 0) {
break;
}
mpf = TD_DLIST_NODE_NEXT_WITH_FIELD(mpf, node);
}
taosRUnLockLatch(&(bktp->latch));
return mpf;
}
static void tdbMPoolUnregFile(TDB_MPOOL *mp, TDB_MPFILE *mpf) {
mpf_bucket_t *bktp;
TDB_MPFILE * tmpf;
if (mpf->mp == NULL) return;
ASSERT(mpf->mp == mp);
bktp = mp->mpfht.buckets + MPF_GET_BUCKETID(mpf->fileid);
taosWLockLatch(&(bktp->latch));
tmpf = TD_DLIST_HEAD(bktp);
while (tmpf) {
if (memcmp(mpf->fileid, tmpf->fileid, TDB_FILE_ID_LEN) == 0) {
TD_DLIST_POP_WITH_FIELD(bktp, tmpf, node);
break;
}
tmpf = TD_DLIST_NODE_NEXT_WITH_FIELD(tmpf, node);
}
taosWUnLockLatch(&(bktp->latch));
ASSERT(tmpf == mpf);
}
static int tdbMPoolFileReadPage(TDB_MPFILE *mpf, pgno_t pgno, void *p) {
pgsz_t pgsize;
TDB_MPOOL *mp;
off_t offset;
size_t rsize;
mp = mpf->mp;
pgsize = mp->pgsize;
offset = pgno * pgsize;
// TODO: use loop to read all data
rsize = pread(mpf->fd, p, pgsize, offset);
// TODO: error handle
return 0;
}
static int tdbMPoolFileWritePage(TDB_MPFILE *mpf, pgno_t pgno, const void *p) {
pgsz_t pgsize;
TDB_MPOOL *mp;
off_t offset;
mp = mpf->mp;
pgsize = mp->pgsize;
offset = pgno * pgsize;
lseek(mpf->fd, offset, SEEK_SET);
// TODO: handle error
write(mpf->fd, p, pgsize);
// TODO: handle error
return 0;
}
static void tdbMPoolClockEvictPage(TDB_MPOOL *mp, pg_t **pagepp) {
pg_t * pagep;
frame_id_t och;
*pagepp = NULL;
och = mp->clockHand;
do {
pagep = mp->pages + mp->clockHand;
mp->clockHand = (mp->clockHand + 1) % mp->npages;
if (pagep->pinRef == 0) {
if (pagep->rbit == 1) {
pagep->rbit = 0;
} else {
break;
}
}
if (mp->clockHand == och) {
return;
}
} while (1);
*pagepp = pagep;
}
#endif
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
typedef struct SPage1 {
char magic[64];
pgno_t mdbRootPgno; // master DB root page number
pgno_t freePgno; // free list page number
uint32_t nFree; // number of free pages
} SPage1;
typedef struct SFreePage {
/* TODO */
} SFreePage;
TDB_STATIC_ASSERT(sizeof(SPage1) <= TDB_MIN_PGSIZE, "TDB Page1 definition too large");
static int pgFileRead(SPgFile *pPgFile, pgno_t pgno, uint8_t *pData);
int pgFileOpen(SPgFile **ppPgFile, const char *fname, TENV *pEnv) {
SPgFile * pPgFile;
SPgCache *pPgCache;
size_t fnameLen;
pgno_t fsize;
*ppPgFile = NULL;
// create the handle
fnameLen = strlen(fname);
pPgFile = (SPgFile *)calloc(1, sizeof(*pPgFile) + fnameLen + 1);
if (pPgFile == NULL) {
return -1;
}
ASSERT(pEnv != NULL);
// init the handle
pPgFile->fname = (char *)(&(pPgFile[1]));
memcpy(pPgFile->fname, fname, fnameLen);
pPgFile->fname[fnameLen] = '\0';
pPgFile->pFile = NULL;
pPgFile->pFile = taosOpenFile(fname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ);
if (pPgFile->pFile == NULL) {
// TODO: handle error
return -1;
}
tdbGnrtFileID(fname, pPgFile->fileid, false);
tdbGetFileSize(fname, tdbEnvGetPageSize(pEnv), &fsize);
pPgFile->fsize = fsize;
pPgFile->lsize = fsize;
if (pPgFile->fsize == 0) {
// A created file
pgno_t pgno;
pgid_t pgid;
pgFileAllocatePage(pPgFile, &pgno);
ASSERT(pgno == 1);
memcpy(pgid.fileid, pPgFile->fileid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
pgCacheFetch(pPgCache, pgid);
// Need to allocate the first page as a description page
} else {
// An existing file
}
/* TODO: other open operations */
// add the page file to the environment
tdbEnvRgstPageFile(pEnv, pPgFile);
pPgFile->pEnv = pEnv;
*ppPgFile = pPgFile;
return 0;
}
int pgFileClose(SPgFile *pPgFile) {
if (pPgFile) {
if (pPgFile->pFile != NULL) {
taosCloseFile(&pPgFile->pFile);
}
tfree(pPgFile->fname);
free(pPgFile);
}
return 0;
}
SPage *pgFileFetch(SPgFile *pPgFile, pgno_t pgno) {
SPgCache *pPgCache;
SPage * pPage;
pgid_t pgid;
// 1. Fetch from the page cache
// pgCacheFetch(pPgCache, pgid);
// 2. If only get a page frame, no content, maybe
// need to load from the file
if (1 /*page not initialized*/) {
if (pgno < pPgFile->fsize) {
// load the page content from the disk
// ?? How about the freed pages ??
} else {
// zero the page, make the page as a empty
// page with zero records.
}
}
#if 0
pPgCache = pPgFile->pPgCache;
pPage = NULL;
memcpy(pgid.fileid, pPgFile->fileid, TDB_FILE_ID_LEN);
pgid.pgno = pgno;
if (pgno > pPgFile->pgFileSize) {
// TODO
} else {
pPage = pgCacheFetch(pPgCache, pgid);
if (1 /*Page is cached, no need to load from file*/) {
return pPage;
} else {
// TODO: handle error
if (pgFileRead(pPgFile, pgno, (void *)pPage) < 0) {
// todoerr
}
return pPage;
}
}
#endif
return pPage;
}
int pgFileRelease(SPage *pPage) {
pgCacheRelease(pPage);
return 0;
}
int pgFileWrite(SPage *pPage) {
// TODO
return 0;
}
int pgFileAllocatePage(SPgFile *pPgFile, pgno_t *pPgno) {
pgno_t pgno;
SPage1 * pPage1;
SPgCache *pPgCache;
pgid_t pgid;
SPage * pPage;
if (pPgFile->lsize == 0) {
pgno = ++(pPgFile->lsize);
} else {
if (0) {
// TODO: allocate from the free list
pPage = pgCacheFetch(pPgCache, pgid);
if (pPage1->nFree > 0) {
// TODO
} else {
pgno = ++(pPgFile->lsize);
}
} else {
pgno = ++(pPgFile->lsize);
}
}
*pPgno = pgno;
return 0;
}
static int pgFileRead(SPgFile *pPgFile, pgno_t pgno, uint8_t *pData) {
pgsz_t pgSize;
ssize_t rsize;
uint8_t *pTData;
size_t szToRead;
#if 0
// pgSize = ; (TODO)
pTData = pData;
szToRead = pgSize;
for (; szToRead > 0;) {
rsize = pread(pPgFile->pFile, pTData, szToRead, pgno * pgSize);
if (rsize < 0) {
if (errno == EINTR) {
continue;
} else {
return -1;
}
} else if (rsize == 0) {
return -1;
}
szToRead -= rsize;
pTData += rsize;
}
#endif
return 0;
}
\ No newline at end of file
...@@ -51,7 +51,8 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique) { ...@@ -51,7 +51,8 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique) {
// return access(pathname, flags); // return access(pathname, flags);
// } // }
int tdbGetFileSize(const char *fname, pgsz_t pgSize, pgno_t *pSize) { int tdbGetFileSize(const char *fname, int pgSize, SPgno *pSize) {
struct stat st;
int ret; int ret;
int64_t file_size = 0; int64_t file_size = 0;
ret = taosStatFile(fname, &file_size, NULL); ret = taosStatFile(fname, &file_size, NULL);
...@@ -63,4 +64,29 @@ int tdbGetFileSize(const char *fname, pgsz_t pgSize, pgno_t *pSize) { ...@@ -63,4 +64,29 @@ int tdbGetFileSize(const char *fname, pgsz_t pgSize, pgno_t *pSize) {
*pSize = file_size / pgSize; *pSize = file_size / pgSize;
return 0; return 0;
}
int tdbPRead(int fd, void *pData, int count, i64 offset) {
void *pBuf;
int nbytes;
i64 ioffset;
int iread;
pBuf = pData;
nbytes = count;
ioffset = offset;
while (nbytes > 0) {
iread = pread(fd, pBuf, nbytes, ioffset);
if (iread < 0) {
/* TODO */
} else if (iread == 0) {
return (count - iread);
}
nbytes = nbytes - iread;
pBuf = (void *)((u8 *)pBuf + iread);
ioffset += iread;
}
return count;
} }
\ No newline at end of file
...@@ -23,20 +23,21 @@ extern "C" { ...@@ -23,20 +23,21 @@ extern "C" {
typedef struct SBTree SBTree; typedef struct SBTree SBTree;
typedef struct SBtCursor SBtCursor; typedef struct SBtCursor SBtCursor;
// SBTree struct SBtCursor {
int btreeOpen(SBTree **ppBt, SPgFile *pPgFile); SBTree *pBt;
int btreeClose(SBTree *pBt); i8 iPage;
SPage *pPage;
// SBtCursor int idx;
int btreeCursorOpen(SBtCursor *pBtCur, SBTree *pBt); int idxStack[BTREE_MAX_DEPTH + 1];
int btreeCursorClose(SBtCursor *pBtCur); SPage *pgStack[BTREE_MAX_DEPTH + 1];
int btreeCursorMoveTo(SBtCursor *pBtCur, int kLen, const void *pKey); void *pBuf;
int btreeCursorNext(SBtCursor *pBtCur);
struct SBTree {
pgno_t root;
}; };
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt);
int tdbBtreeCursor(SBtCursor *pCur, SBTree *pBt);
int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *pVal, int vLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -13,49 +13,22 @@ ...@@ -13,49 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_PAGE_FILE_H_ #ifndef _TD_TDB_DB_H_
#define _TD_PAGE_FILE_H_ #define _TD_TDB_DB_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#pragma pack (push,1) typedef struct STDb STDb;
typedef struct {
char hdrInfo[16]; // info string
pgsz_t szPage; // page size of current file
int32_t cno; // commit number counter
pgno_t freePgno; // freelist page number
uint8_t resv[100]; // reserved space
} SPgFileHdr;
#pragma pack(pop)
#define TDB_PG_FILE_HDR_SIZE 128 int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprFn, STEnv *pEnv, STDb **ppDb);
int tdbDbClose(STDb *pDb);
TDB_STATIC_ASSERT(sizeof(SPgFileHdr) == TDB_PG_FILE_HDR_SIZE, "Page file header size if not 128"); int tdbDbDrop(STDb *pDb);
int tdbDbInsert(STDb *pDb, const void *pKey, int keyLen, const void *pVal, int valLen);
struct SPgFile {
TENV * pEnv; // env containing this page file
char * fname; // backend file name
uint8_t fileid[TDB_FILE_ID_LEN]; // file id
pgno_t lsize; // page file logical size (for count)
pgno_t fsize; // real file size on disk (for rollback)
TdFilePtr pFile;
SPgFileListNode envHash;
SPgFileListNode envPgfList;
};
int pgFileOpen(SPgFile **ppPgFile, const char *fname, TENV *pEnv);
int pgFileClose(SPgFile *pPgFile);
SPage *pgFileFetch(SPgFile *pPgFile, pgno_t pgno);
int pgFileRelease(SPage *pPage);
int pgFileWrite(SPage *pPage);
int pgFileAllocatePage(SPgFile *pPgFile, pgno_t *pPgno);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_PAGE_FILE_H_*/ #endif /*_TD_TDB_DB_H_*/
\ No newline at end of file \ No newline at end of file
...@@ -20,11 +20,17 @@ ...@@ -20,11 +20,17 @@
extern "C" { extern "C" {
#endif #endif
const char* tdbEnvGetRootDir(TENV* pEnv); typedef struct STEnv {
SPgFile* tdbEnvGetPageFile(TENV* pEnv, const uint8_t fileid[]); char * rootDir;
SPgCache* tdbEnvGetPgCache(TENV* pEnv); char * jfname;
int tdbEnvRgstPageFile(TENV* pEnv, SPgFile* pPgFile); int jfd;
int tdbEnvRgstDB(TENV* pEnv, TDB* pDb); SPCache *pCache;
} STEnv;
int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, STEnv **ppEnv);
int tdbEnvClose(STEnv *pEnv);
SPager *tdbEnvGetPager(STEnv *pEnv, const char *fname);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -19,16 +19,33 @@ ...@@ -19,16 +19,33 @@
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tdb.h" // #include "tdb.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct SPgFile SPgFile; typedef int8_t i8;
typedef int16_t i16;
typedef int32_t i32;
typedef int64_t i64;
typedef uint8_t u8;
typedef uint16_t u16;
typedef uint32_t u32;
typedef uint64_t u64;
// p must be u8 *
#define TDB_GET_U24(p) ((p)[0] * 65536 + *(u16 *)((p) + 1))
#define TDB_PUT_U24(p, v) \
do { \
int tv = (v); \
(p)[2] = tv & 0xff; \
(p)[1] = (tv >> 8) & 0xff; \
(p)[0] = (tv >> 16) & 0xff; \
} while (0)
// pgno_t // SPgno
typedef int32_t pgno_t; typedef u32 SPgno;
#define TDB_IVLD_PGNO ((pgno_t)0) #define TDB_IVLD_PGNO ((pgno_t)0)
// fileid // fileid
...@@ -37,8 +54,8 @@ typedef int32_t pgno_t; ...@@ -37,8 +54,8 @@ typedef int32_t pgno_t;
// pgid_t // pgid_t
typedef struct { typedef struct {
uint8_t fileid[TDB_FILE_ID_LEN]; uint8_t fileid[TDB_FILE_ID_LEN];
pgno_t pgno; SPgno pgno;
} pgid_t; } pgid_t, SPgid;
#define TDB_IVLD_PGID (pgid_t){0, TDB_IVLD_PGNO}; #define TDB_IVLD_PGID (pgid_t){0, TDB_IVLD_PGNO};
...@@ -61,18 +78,14 @@ static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) { ...@@ -61,18 +78,14 @@ static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) {
} }
} }
// framd_id_t #define TDB_IS_SAME_PAGE(pPgid1, pPgid2) (tdbCmprPgId(pPgid1, pPgid2) == 0)
typedef int32_t frame_id_t;
// pgsz_t // pgsz_t
#define TDB_MIN_PGSIZE 512 #define TDB_MIN_PGSIZE 512 // 512B
#define TDB_MAX_PGSIZE 65536 #define TDB_MAX_PGSIZE 16777216 // 16M
#define TDB_DEFAULT_PGSIZE 4096 #define TDB_DEFAULT_PGSIZE 4096
#define TDB_IS_PGSIZE_VLD(s) (((s) >= TDB_MIN_PGSIZE) && ((s) <= TDB_MAX_PGSIZE)) #define TDB_IS_PGSIZE_VLD(s) (((s) >= TDB_MIN_PGSIZE) && ((s) <= TDB_MAX_PGSIZE))
// pgoff_t
typedef pgsz_t pgoff_t;
// cache // cache
#define TDB_DEFAULT_CACHE_SIZE (256 * 4096) // 1M #define TDB_DEFAULT_CACHE_SIZE (256 * 4096) // 1M
...@@ -100,7 +113,7 @@ typedef TD_DLIST_NODE(SPgFile) SPgFileListNode; ...@@ -100,7 +113,7 @@ typedef TD_DLIST_NODE(SPgFile) SPgFileListNode;
} \ } \
} while (0) } while (0)
#define TDB_VARIANT_LEN (int)-1 #define TDB_VARIANT_LEN ((int)-1)
// page payload format // page payload format
// <keyLen> + <valLen> + [key] + [value] // <keyLen> + <valLen> + [key] + [value]
...@@ -115,18 +128,40 @@ typedef TD_DLIST_NODE(SPgFile) SPgFileListNode; ...@@ -115,18 +128,40 @@ typedef TD_DLIST_NODE(SPgFile) SPgFileListNode;
/* TODO */ \ /* TODO */ \
} while (0) } while (0)
typedef int (*FKeyComparator)(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
#define TDB_JOURNAL_NAME "tdb.journal" #define TDB_JOURNAL_NAME "tdb.journal"
#define TDB_FILENAME_LEN 128
#define TDB_DEFAULT_FANOUT 6
#define BTREE_MAX_DEPTH 20
#define TDB_FLAG_IS(flags, flag) ((flags) == (flag))
#define TDB_FLAG_HAS(flags, flag) (((flags) & (flag)) != 0)
#define TDB_FLAG_NO(flags, flag) ((flags) & (flag) == 0)
#define TDB_FLAG_ADD(flags, flag) ((flags) |= (flag))
#define TDB_FLAG_REMOVE(flags, flag) ((flags) &= (~(flag)))
typedef struct SPager SPager;
typedef struct SPCache SPCache;
typedef struct SPage SPage;
#include "tdbUtil.h" #include "tdbUtil.h"
#include "tdbBtree.h" #include "tdbPCache.h"
#include "tdbPgCache.h" #include "tdbPager.h"
#include "tdbPgFile.h" #include "tdbBtree.h"
#include "tdbEnv.h" #include "tdbEnv.h"
#include "tdbDb.h"
#include "tdbPage.h"
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,26 +20,25 @@ ...@@ -20,26 +20,25 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SPgCache SPgCache; #define TDB_PCACHE_PAGE \
typedef struct SPage SPage; u8 isAnchor; \
u8 isLocalPage; \
// SPgCache u8 isDirty; \
int pgCacheOpen(SPgCache **ppPgCache, TENV *pEnv); i32 nRef; \
int pgCacheClose(SPgCache *pPgCache); SPCache *pCache; \
SPage *pFreeNext; \
SPage *pgCacheFetch(SPgCache *pPgCache, pgid_t pgid); SPage *pHashNext; \
int pgCacheRelease(SPage *pPage); SPage *pLruNext; \
SPage *pLruPrev; \
// SPage SPage *pDirtyNext; \
typedef TD_DLIST_NODE(SPage) SPgListNode; SPager *pPager; \
struct SPage { SPgid pgid;
pgid_t pgid; // page id
frame_id_t frameid; // frame id int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
uint8_t * pData; // real data int tdbPCacheClose(SPCache *pCache);
SPgListNode freeNode; // for SPgCache.freeList SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage);
SPgListNode pghtNode; // for pght void tdbPCacheRelease(SPage *pPage);
SPgListNode lruNode; // for LRU int tdbPCacheGetPageSize(SPCache *pCache);
};
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TDB_PAGE_H_
#define _TDB_PAGE_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef u8 SCell;
// PAGE APIS implemented
typedef struct {
int szOffset;
int szPageHdr;
int szFreeCell;
// flags
u16 (*getFlags)(SPage *);
void (*setFlags)(SPage *, u16);
// cell number
int (*getCellNum)(SPage *);
void (*setCellNum)(SPage *, int);
// cell content offset
int (*getCellBody)(SPage *);
void (*setCellBody)(SPage *, int);
// first free cell offset (0 means no free cells)
int (*getCellFree)(SPage *);
void (*setCellFree)(SPage *, int);
// total free bytes
int (*getFreeBytes)(SPage *);
void (*setFreeBytes)(SPage *, int);
// cell offset at idx
int (*getCellOffset)(SPage *, int);
void (*setCellOffset)(SPage *, int, int);
} SPageMethods;
// Page footer
typedef struct __attribute__((__packed__)) {
u8 cksm[4];
} SPageFtr;
struct SPage {
pthread_spinlock_t lock;
u8 *pData;
int pageSize;
SPageMethods *pPageMethods;
// Fields below used by pager and am
u8 szAmHdr;
u8 *pPageHdr;
u8 *pAmHdr;
u8 *pCellIdx;
u8 *pFreeStart;
u8 *pFreeEnd;
SPageFtr *pPageFtr;
int kLen; // key length of the page, -1 for unknown
int vLen; // value length of the page, -1 for unknown
int nFree;
int maxLocal;
int minLocal;
int nOverflow;
SCell *apOvfl[4];
int aiOvfl[4];
// Fields used by SPCache
TDB_PCACHE_PAGE
};
/* For page */
#define TDB_PAGE_FLAGS(pPage) (*(pPage)->pPageMethods->getFlags)(pPage)
#define TDB_PAGE_NCELLS(pPage) (*(pPage)->pPageMethods->getCellNum)(pPage)
#define TDB_PAGE_CCELLS(pPage) (*(pPage)->pPageMethods->getCellBody)(pPage)
#define TDB_PAGE_FCELL(pPage) (*(pPage)->pPageMethods->getCellFree)(pPage)
#define TDB_PAGE_NFREE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_CELL_OFFSET_AT(pPage, idx) (*(pPage)->pPageMethods->getCellOffset)(pPage, idx)
#define TDB_PAGE_FLAGS_SET(pPage, FLAGS) (*(pPage)->pPageMethods->setFlags)(pPage, FLAGS)
#define TDB_PAGE_NCELLS_SET(pPage, NCELLS) (*(pPage)->pPageMethods->setCellNum)(pPage, NCELLS)
#define TDB_PAGE_CCELLS_SET(pPage, CCELLS) (*(pPage)->pPageMethods->setCellBody)(pPage, CCELLS)
#define TDB_PAGE_FCELL_SET(pPage, FCELL) (*(pPage)->pPageMethods->setCellFree)(pPage, FCELL)
#define TDB_PAGE_NFREE_SET(pPage, NFREE) (*(pPage)->pPageMethods->setFreeBytes)(pPage, NFREE)
#define TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, OFFSET) (*(pPage)->pPageMethods->setCellOffset)(pPage, idx, OFFSET)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
#define TDB_PAGE_CELL_AT(pPage, idx) ((pPage)->pData + TDB_PAGE_CELL_OFFSET_AT(pPage, idx))
// For page lock
#define P_LOCK_SUCC 0
#define P_LOCK_BUSY 1
#define P_LOCK_FAIL -1
#define TDB_INIT_PAGE_LOCK(pPage) pthread_spin_init(&((pPage)->lock), 0)
#define TDB_DESTROY_PAGE_LOCK(pPage) pthread_spin_destroy(&((pPage)->lock))
#define TDB_LOCK_PAGE(pPage) pthread_spin_lock(&((pPage)->lock))
#define TDB_UNLOCK_PAGE(pPage) pthread_spin_unlock(&((pPage)->lock))
#define TDB_TRY_LOCK_PAGE(pPage) \
({ \
int ret; \
if (pthread_spin_trylock(&((pPage)->lock)) == 0) { \
ret = P_LOCK_SUCC; \
} else if (errno == EBUSY) { \
ret = P_LOCK_BUSY; \
} else { \
ret = P_LOCK_FAIL; \
} \
ret; \
})
// APIs
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell);
int tdbPageDropCell(SPage *pPage, int idx);
#ifdef __cplusplus
}
#endif
#endif /*_TDB_PAGE_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TDB_PAGER_H_
#define _TDB_PAGER_H_
#ifdef __cplusplus
extern "C" {
#endif
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager);
int tdbPagerCommit(SPager *pPager);
int tdbPagerGetPageSize(SPager *pPager);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
#ifdef __cplusplus
}
#endif
#endif /*_TDB_PAGER_H_*/
\ No newline at end of file
...@@ -35,7 +35,48 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique); ...@@ -35,7 +35,48 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique);
// #define TDB_W_OK 0x4 // #define TDB_W_OK 0x4
// int tdbCheckFileAccess(const char *pathname, int mode); // int tdbCheckFileAccess(const char *pathname, int mode);
int tdbGetFileSize(const char *fname, pgsz_t pgSize, pgno_t *pSize); int tdbGetFileSize(const char *fname, int pgSize, SPgno *pSize);
int tdbPRead(int fd, void *pData, int count, i64 offset);
static inline int tdbPutVarInt(u8 *p, int v) {
int n = 0;
for (;;) {
if (v <= 0x7f) {
p[n++] = v;
break;
}
p[n++] = (v & 0x7f) | 0x80;
v >>= 7;
}
ASSERT(n < 6);
return n;
}
static inline int tdbGetVarInt(const u8 *p, int *v) {
int n = 0;
int tv = 0;
for (;;) {
if (p[n] <= 0x7f) {
tv = (tv << 7) | p[n];
n++;
break;
}
tv = (tv << 7) | (p[n] & 0x7f);
n++;
}
ASSERT(n < 6);
*v = tv;
return n;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
extern SPageMethods pageMethods;
extern SPageMethods pageLargeMethods;
typedef struct __attribute__((__packed__)) {
u16 szCell;
u16 nxOffset;
} SFreeCell;
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell);
static int tdbPageDefragment(SPage *pPage);
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg) {
SPage *pPage;
u8 *ptr;
int size;
ASSERT(TDB_IS_PGSIZE_VLD(pageSize));
*ppPage = NULL;
size = pageSize + sizeof(*pPage);
ptr = (u8 *)((*xMalloc)(arg, size));
if (pPage == NULL) {
return -1;
}
memset(ptr, 0, size);
pPage = (SPage *)(ptr + pageSize);
pPage->pData = ptr;
pPage->pageSize = pageSize;
if (pageSize < 65536) {
pPage->pPageMethods = &pageMethods;
} else {
pPage->pPageMethods = &pageLargeMethods;
}
TDB_INIT_PAGE_LOCK(pPage);
/* TODO */
*ppPage = pPage;
return 0;
}
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
u8 *ptr;
ptr = pPage->pData;
(*xFree)(arg, ptr);
return 0;
}
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
int ret;
SCell *pTarget;
u8 *pTmp;
int j;
if (pPage->nOverflow || szCell + TDB_PAGE_OFFSET_SIZE(pPage) > pPage->nFree) {
// TODO: need to figure out if pCell may be used by outside of this function
j = pPage->nOverflow++;
pPage->apOvfl[j] = pCell;
pPage->aiOvfl[j] = idx;
} else {
ret = tdbPageAllocate(pPage, szCell, &pTarget);
if (ret < 0) {
return -1;
}
memcpy(pTarget, pCell, szCell);
pTmp = pPage->pCellIdx + idx * TDB_PAGE_OFFSET_SIZE(pPage);
memmove(pTmp + TDB_PAGE_OFFSET_SIZE(pPage), pTmp, pPage->pFreeStart - pTmp - TDB_PAGE_OFFSET_SIZE(pPage));
TDB_PAGE_CELL_OFFSET_AT_SET(pPage, idx, pTarget - pPage->pData);
TDB_PAGE_NCELLS_SET(pPage, TDB_PAGE_NCELLS(pPage) + 1);
}
return 0;
}
int tdbPageDropCell(SPage *pPage, int idx) {
// TODO
return 0;
}
static int tdbPageAllocate(SPage *pPage, int size, SCell **ppCell) {
SCell *pCell;
SFreeCell *pFreeCell;
u8 *pOffset;
int ret;
ASSERT(pPage->nFree > size + TDB_PAGE_OFFSET_SIZE(pPage));
pCell = NULL;
*ppCell = NULL;
// 1. Try to allocate from the free space area
if (pPage->pFreeEnd - pPage->pFreeStart > size + TDB_PAGE_OFFSET_SIZE(pPage)) {
pPage->pFreeEnd -= size;
pPage->pFreeStart += TDB_PAGE_OFFSET_SIZE(pPage);
pCell = pPage->pFreeEnd;
}
// 2. Try to allocate from the page free list
if ((pCell == NULL) && (pPage->pFreeEnd - pPage->pFreeStart >= TDB_PAGE_OFFSET_SIZE(pPage)) &&
TDB_PAGE_FCELL(pPage)) {
#if 0
int szCell;
int nxOffset;
pCell = pPage->pData + TDB_PAGE_FCELL(pPage);
pOffset = TDB_IS_LARGE_PAGE(pPage) ? ((SPageHdrL *)(pPage->pPageHdr))[0].fCell
: (u8 *)&(((SPageHdr *)(pPage->pPageHdr))[0].fCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
for (;;) {
// Find a cell
if (szCell >= size) {
if (szCell - size >= pPage->szFreeCell) {
SCell *pTmpCell = pCell + size;
TDB_PAGE_FREE_CELL_SIZE_SET(pPage, pTmpCell, szCell - size);
TDB_PAGE_FREE_CELL_NXOFFSET_SET(pPage, pTmpCell, nxOffset);
// TODO: *pOffset = pTmpCell - pPage->pData;
} else {
TDB_PAGE_NFREE_SET(pPage, TDB_PAGE_NFREE(pPage) + szCell - size);
// TODO: *pOffset = nxOffset;
}
break;
}
// Not find a cell yet
if (nxOffset > 0) {
pCell = pPage->pData + nxOffset;
pOffset = TDB_PAGE_FREE_CELL_NXOFFSET_PTR(pPage, pCell);
szCell = TDB_PAGE_FREE_CELL_SIZE(pPage, pCell);
nxOffset = TDB_PAGE_FREE_CELL_NXOFFSET(pPage, pCell);
continue;
} else {
pCell = NULL;
break;
}
}
if (pCell) {
pPage->pFreeStart = pPage->pFreeStart + pPage->szOffset;
}
#endif
}
// 3. Try to dfragment and allocate again
if (pCell == NULL) {
ret = tdbPageDefragment(pPage);
if (ret < 0) {
return -1;
}
ASSERT(pPage->pFreeEnd - pPage->pFreeStart > size + TDB_PAGE_OFFSET_SIZE(pPage));
ASSERT(pPage->nFree == pPage->pFreeEnd - pPage->pFreeStart);
// Allocate from the free space area again
pPage->pFreeEnd -= size;
pPage->pFreeStart += TDB_PAGE_OFFSET_SIZE(pPage);
pCell = pPage->pFreeEnd;
}
ASSERT(pCell != NULL);
pPage->nFree = pPage->nFree - size - TDB_PAGE_OFFSET_SIZE(pPage);
*ppCell = pCell;
return 0;
}
static int tdbPageFree(SPage *pPage, int idx, SCell *pCell, int size) {
// TODO
return 0;
}
static int tdbPageDefragment(SPage *pPage) {
// TODO
ASSERT(0);
return 0;
}
/* ---------------------------------------------------------------------------------------------------------- */
typedef struct __attribute__((__packed__)) {
u16 flags;
u16 cellNum;
u16 cellBody;
u16 cellFree;
u16 nFree;
} SPageHdr;
// flags
static inline u16 getPageFlags(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].flags; }
static inline void setPageFlags(SPage *pPage, u16 flags) { ((SPageHdr *)(pPage->pPageHdr))[0].flags = flags; }
// cellNum
static inline int getPageCellNum(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellNum; }
static inline void setPageCellNum(SPage *pPage, int cellNum) {
ASSERT(cellNum < 65536);
((SPageHdr *)(pPage->pPageHdr))[0].cellNum = (u16)cellNum;
}
// cellBody
static inline int getPageCellBody(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellBody; }
static inline void setPageCellBody(SPage *pPage, int cellBody) {
ASSERT(cellBody < 65536);
((SPageHdr *)(pPage->pPageHdr))[0].cellBody = (u16)cellBody;
}
// cellFree
static inline int getPageCellFree(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].cellFree; }
static inline void setPageCellFree(SPage *pPage, int cellFree) {
ASSERT(cellFree < 65536);
((SPageHdr *)(pPage->pPageHdr))[0].cellFree = (u16)cellFree;
}
// nFree
static inline int getPageNFree(SPage *pPage) { return ((SPageHdr *)(pPage->pPageHdr))[0].nFree; }
static inline void setPageNFree(SPage *pPage, int nFree) {
ASSERT(nFree < 65536);
((SPageHdr *)(pPage->pPageHdr))[0].nFree = (u16)nFree;
}
// cell offset
static inline int getPageCellOffset(SPage *pPage, int idx) {
ASSERT(idx >= 0 && idx < getPageCellNum(pPage));
return ((u16 *)pPage->pCellIdx)[idx];
}
static inline void setPageCellOffset(SPage *pPage, int idx, int offset) {
ASSERT(offset < 65536);
((u16 *)pPage->pCellIdx)[idx] = (u16)offset;
}
SPageMethods pageMethods = {
2, // szOffset
sizeof(SPageHdr), // szPageHdr
sizeof(SFreeCell), // szFreeCell
getPageFlags, // getPageFlags
setPageFlags, // setFlagsp
getPageCellNum, // getCellNum
setPageCellNum, // setCellNum
getPageCellBody, // getCellBody
setPageCellBody, // setCellBody
getPageCellFree, // getCellFree
setPageCellFree, // setCellFree
getPageNFree, // getFreeBytes
setPageNFree, // setFreeBytes
getPageCellOffset, // getCellOffset
setPageCellOffset // setCellOffset
};
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
typedef struct __attribute__((__packed__)) {
u16 flags;
u8 cellNum[3];
u8 cellBody[3];
u8 cellFree[3];
u8 nFree[3];
} SPageHdrL;
typedef struct __attribute__((__packed__)) {
u8 szCell[3];
u8 nxOffset[3];
} SFreeCellL;
// flags
static inline u16 getPageFlags(SPage *pPage) { return ((SPageHdrL *)(pPage->pPageHdr))[0].flags; }
static inline void setPageFlags(SPage *pPage, u16 flags) { ((SPageHdrL *)(pPage->pPageHdr))[0].flags = flags; }
// cellNum
static inline int getPageCellNum(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellNum); }
static inline void setPageCellNum(SPage *pPage, int cellNum) {
TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellNum, cellNum);
}
// cellBody
static inline int getPageCellBody(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellBody); }
static inline void setPageCellBody(SPage *pPage, int cellBody) {
TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellBody, cellBody);
}
// cellFree
static inline int getPageCellFree(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellFree); }
static inline void setPageCellFree(SPage *pPage, int cellFree) {
TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].cellFree, cellFree);
}
// nFree
static inline int getPageNFree(SPage *pPage) { return TDB_GET_U24(((SPageHdrL *)(pPage->pPageHdr))[0].nFree); }
static inline void setPageNFree(SPage *pPage, int nFree) {
TDB_PUT_U24(((SPageHdrL *)(pPage->pPageHdr))[0].nFree, nFree);
}
// cell offset
static inline int getPageCellOffset(SPage *pPage, int idx) {
ASSERT(idx >= 0 && idx < getPageCellNum(pPage));
return TDB_GET_U24(pPage->pCellIdx + 3 * idx);
}
static inline void setPageCellOffset(SPage *pPage, int idx, int offset) {
TDB_PUT_U24(pPage->pCellIdx + 3 * idx, offset);
}
SPageMethods pageLargeMethods = {
3, // szOffset
sizeof(SPageHdrL), // szPageHdr
sizeof(SFreeCellL), // szFreeCell
getPageFlags, // getPageFlags
setPageFlags, // setFlagsp
getPageCellNum, // getCellNum
setPageCellNum, // setCellNum
getPageCellBody, // getCellBody
setPageCellBody, // setCellBody
getPageCellFree, // getCellFree
setPageCellFree, // setCellFree
getPageNFree, // getFreeBytes
setPageNFree, // setFreeBytes
getPageCellOffset, // getCellOffset
setPageCellOffset // setCellOffset
};
\ No newline at end of file
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "tdb.h" #include "tdbInt.h"
TEST(tdb_test, simple_test) { TEST(tdb_test, simple_test) {
TENV * pEnv; int ret;
TDB * pDb1, *pDb2, *pDb3; STEnv *pEnv;
pgsz_t pgSize = 1024; STDb *pDb;
cachesz_t cacheSize = 10240;
// Open Env
// ENV ret = tdbEnvOpen("tdb", 1024, 20, &pEnv);
GTEST_ASSERT_EQ(tdbEnvCreate(&pEnv, "./testtdb"), 0); GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(tdbEnvSetCache(pEnv, pgSize, cacheSize), 0); // Create a database
ret = tdbDbOpen("db.db", TDB_VARIANT_LEN, TDB_VARIANT_LEN, NULL, pEnv, &pDb);
GTEST_ASSERT_EQ(tdbEnvGetCacheSize(pEnv), cacheSize); GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(tdbEnvGetPageSize(pEnv), pgSize); { // Insert some data
char key[64];
GTEST_ASSERT_EQ(tdbEnvOpen(pEnv), 0); char val[64];
#if 1 for (int i = 1; i <= 1000; i++) {
// DB sprintf(key, "key%d", i);
GTEST_ASSERT_EQ(tdbCreate(&pDb1), 0); sprintf(val, "value%d", i);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val));
// GTEST_ASSERT_EQ(tdbSetKeyLen(pDb1, 8), 0); GTEST_ASSERT_EQ(ret, 0);
}
// GTEST_ASSERT_EQ(tdbGetKeyLen(pDb1), 8); }
// GTEST_ASSERT_EQ(tdbSetValLen(pDb1, 3), 0); ret = tdbDbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// GTEST_ASSERT_EQ(tdbGetValLen(pDb1), 3);
// Close a database
// GTEST_ASSERT_EQ(tdbSetDup(pDb1, 1), 0); tdbDbClose(pDb);
// GTEST_ASSERT_EQ(tdbGetDup(pDb1), 1); // Close Env
ret = tdbEnvClose(pEnv);
// GTEST_ASSERT_EQ(tdbSetCmprFunc(pDb1, NULL), 0); GTEST_ASSERT_EQ(ret, 0);
tdbEnvBeginTxn(pEnv);
GTEST_ASSERT_EQ(tdbOpen(pDb1, "db.db", "db1", pEnv), 0);
// char *key = "key1";
// char *val = "value1";
// tdbInsert(pDb1, (void *)key, strlen(key), (void *)val, strlen(val));
tdbEnvCommit(pEnv);
#if 0
// Insert
// Query
// Delete
// Query
#endif
// GTEST_ASSERT_EQ(tdbOpen(&pDb2, "db.db", "db2", pEnv), 0);
// GTEST_ASSERT_EQ(tdbOpen(&pDb3, "index.db", NULL, pEnv), 0);
// tdbClose(pDb3);
// tdbClose(pDb2);
tdbClose(pDb1);
#endif
tdbEnvClose(pEnv);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册