提交 fd6b2ad6 编写于 作者: H Hongze Cheng

more refact

上级 8565e900
......@@ -96,7 +96,7 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
metaReaderInit(&pTbCur->mr, pMeta, 0);
tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc);
tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
return pTbCur;
}
......@@ -119,7 +119,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
STbCfg tbCfg;
for (;;) {
ret = tdbDbNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
ret = tdbDbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
if (ret < 0) {
return -1;
}
......@@ -193,7 +193,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
}
pCtbCur->suid = uid;
ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur);
ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
if (ret < 0) {
taosMemoryFree(pCtbCur);
return NULL;
......@@ -219,7 +219,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
int ret;
SCtbIdxKey *pCtbIdxKey;
ret = tdbDbNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
ret = tdbDbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
if (ret < 0) {
return 0;
}
......@@ -283,7 +283,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
while (true) {
// TODO: lock during iterate?
if (tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
pSmaIdxKey = pCur->pKey;
ASSERT(pSmaIdxKey != NULL);
......@@ -354,7 +354,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
tb_uid_t uid = 0;
while (true) {
// TODO: lock during iterate?
if (tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
ASSERT(pSmaIdxKey != NULL);
pSmaIdxKey = pCur->pKey;
......
......@@ -438,7 +438,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
while (true) {
// TODO: lock during iterate?
if (tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
if (tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, NULL, &pCur->vLen) == 0) {
pSmaIdxKey = pCur->pKey;
ASSERT(pSmaIdxKey != NULL);
......@@ -613,7 +613,7 @@ int64_t metaSmaCursorNext(SMSmaCursor *pCur) {
void *pBuf;
SSmaIdxKey *smaIdxKey;
ret = tdbDbNext(pCur->pCur, &pCur->pKey, &pCur->kLen, &pCur->pVal, &pCur->vLen);
ret = tdbDbcNext(pCur->pCur, &pCur->pKey, &pCur->kLen, &pCur->pVal, &pCur->vLen);
if (ret < 0) {
return 0;
}
......
......@@ -31,7 +31,7 @@ typedef struct STDBC TDBC;
typedef struct STxn TXN;
// TENV
int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv);
int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv);
int tdbEnvClose(TENV *pEnv);
int tdbBegin(TENV *pEnv, TXN *pTxn);
int tdbCommit(TENV *pEnv, TXN *pTxn);
......@@ -45,10 +45,18 @@ int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
// TDBC
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc);
int tdbDbNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen);
#define TDB_FLG_BACKWD 0x1 // backward search
#define TDB_FLG_CMP_LT 0x2 // less than
#define TDB_FLG_CMP_EQ 0x4 // equal
#define TDB_FLG_CMP_GT 0x8 // greater than
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn);
int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen, tdb_cmpr_fn_t cmprFn, int flags);
int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen);
int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen);
int tdbDbcDrop(TDBC *pDbc);
int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbDbcClose(TDBC *pDbc);
int tdbDbcInsert(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen);
// TXN
#define TDB_TXN_WRITE 0x1
......@@ -69,6 +77,9 @@ struct STxn {
void *xArg;
};
// error code
enum { TDB_CODE_SUCCESS = 0, TDB_CODE_MAX };
#ifdef __cplusplus
}
#endif
......
......@@ -67,8 +67,8 @@ typedef struct {
u8 *pBuf;
} SCellDecoder;
static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst);
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst);
static int tdbBtreeOpenImpl(SBTree *pBt);
static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
......@@ -1504,4 +1504,121 @@ void tdbBtPageInfo(SPage *pPage, int idx) {
pBtPageInfo->nOvfl = pPage->nOverflow;
}
#endif
// TDB_BTREE_DEBUG
\ No newline at end of file
// TDB_BTREE_DEBUG
int tdbBtcMoveTo2(SBTC *pBtc, const void *pKey, int kLen, tdb_cmpr_fn_t cmprFn, int flags) {
SBTree *pBt = pBtc->pBt;
SPager *pPager = pBt->pPager;
SPgno root = pBt->root;
SCellDecoder cd = {0};
int nCells = 0;
SCell *pCell = NULL;
int ret = 0;
int c;
int backward = flags & TDB_FLG_BACKWD;
if (cmprFn == NULL) {
cmprFn = pBt->kcmpr;
}
// move cursor to a level
if (pBtc->iPage < 0) {
// move from clear cursor
ret = tdbPagerFetchPage(pPager, &root, &(pBtc->pPage), tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
if (ret < 0) {
// TODO
ASSERT(0);
return -1;
}
pBtc->iPage = 0;
pBtc->idx = -1;
// for empty tree, just return with an invalid position
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) return 0;
} else {
// move from a position (TODO)
}
// search downward
for (;;) {
int lidx, ridx, midx;
SPage *pPage;
pPage = pBtc->pPage;
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
lidx = 0;
ridx = nCells - 1;
ASSERT(nCells > 0);
ASSERT(pBtc->idx == -1);
// compare first cell
midx = lidx;
pCell = tdbPageGetCell(pPage, midx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
c = cmprFn(pKey, kLen, cd.pKey, cd.kLen);
if (c <= 0) {
ridx = lidx - 1;
} else {
lidx = lidx + 1;
}
// compare last cell
if (lidx <= ridx) {
midx = ridx;
pCell = tdbPageGetCell(pPage, midx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
c = cmprFn(pKey, kLen, cd.pKey, cd.kLen);
if (c >= 0) {
lidx = ridx + 1;
} else {
ridx = ridx - 1;
}
}
// binary search
for (;;) {
if (lidx > ridx) break;
midx = (lidx + ridx) >> 1;
pCell = tdbPageGetCell(pPage, midx);
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
if (ret < 0) {
// TODO: handle error
ASSERT(0);
return -1;
}
// Compare the key values
c = cmprFn(pKey, kLen, cd.pKey, cd.kLen);
if (c < 0) {
// pKey < cd.pKey
ridx = midx - 1;
} else if (c > 0) {
// pKey > cd.pKey
lidx = midx + 1;
} else {
// pKey == cd.pKey
break;
}
}
// keep search downward or break
if (TDB_BTREE_PAGE_IS_LEAF(pPage)) {
pBtc->idx = midx;
// *pCRst = c;
break;
} else {
if (c <= 0) {
pBtc->idx = midx;
} else {
pBtc->idx = midx + 1;
}
tdbBtcMoveDownward(pBtc);
}
}
return 0;
}
\ No newline at end of file
......@@ -87,7 +87,7 @@ int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, vo
return tdbBtreePGet(pDb->pBt, pKey, kLen, ppKey, pkLen, ppVal, vLen);
}
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc) {
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn) {
int ret;
TDBC *pDbc = NULL;
......@@ -97,7 +97,7 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc) {
return -1;
}
tdbBtcOpen(&pDbc->btc, pDb->pBt, NULL);
tdbBtcOpen(&pDbc->btc, pDb->pBt, pTxn);
// TODO: move to first now, we can move to any key-value
// and in any direction, design new APIs.
......@@ -111,7 +111,39 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc) {
return 0;
}
int tdbDbNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen, tdb_cmpr_fn_t cmprFn, int flags) {
int tflags;
// set/check flags
if (flags == 0) {
flags |= TDB_FLG_CMP_EQ;
} else {
tflags = flags & (TDB_FLG_CMP_LT | TDB_FLG_CMP_EQ | TDB_FLG_CMP_GT);
if (tflags != TDB_FLG_CMP_LT && tflags != TDB_FLG_CMP_EQ && tflags != TDB_FLG_CMP_GT) return -1;
}
return tdbBtcMoveTo2(&pDbc->btc, pKey, kLen, cmprFn, flags);
}
int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen) {
// TODO
ASSERT(0);
return 0;
}
int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen) {
// TODO
ASSERT(0);
return 0;
}
int tdbDbcDrop(TDBC *pDbc) {
// TODO
ASSERT(0);
return 0;
}
int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return tdbBtreeNext(&pDbc->btc, ppKey, kLen, ppVal, vLen);
}
......@@ -122,9 +154,3 @@ int tdbDbcClose(TDBC *pDbc) {
return 0;
}
int tdbDbcInsert(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen) {
// TODO
ASSERT(0);
return 0;
}
\ No newline at end of file
......@@ -15,7 +15,7 @@
#include "tdbInt.h"
int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv) {
int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv) {
TENV *pEnv;
int dsize;
int zsize;
......@@ -49,7 +49,7 @@ int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv) {
pEnv->jfd = -1;
ret = tdbPCacheOpen(pageSize, cacheSize, &(pEnv->pCache));
ret = tdbPCacheOpen(szPage, pages, &(pEnv->pCache));
if (ret < 0) {
return -1;
}
......
......@@ -135,7 +135,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
// 1. Search the hash table
pPage = pCache->pgHash[tdbPCachePageHash(pPgid) % pCache->nHash];
while (pPage) {
if (TDB_IS_SAME_PAGE(&(pPage->pgid), pPgid)) break;
if (memcmp(pPage->pgid.fileid, pPgid->fileid, TDB_FILE_ID_LEN) == 0 && pPage->pgid.pgno == pPgid->pgno) break;
pPage = pPage->pHashNext;
}
......
......@@ -33,7 +33,6 @@ typedef uint64_t u64;
// SPgno
typedef u32 SPgno;
#define TDB_IVLD_PGNO ((pgno_t)0)
#include "tdbOs.h"
#include "tdbUtil.h"
......@@ -57,38 +56,12 @@ typedef struct {
SPgno pgno;
} SPgid;
#define TDB_IVLD_PGID (SPgid){0, TDB_IVLD_PGNO};
static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) {
SPgid *pgid1 = (SPgid *)p1;
SPgid *pgid2 = (SPgid *)p2;
int rcode;
rcode = memcmp(pgid1->fileid, pgid2->fileid, TDB_FILE_ID_LEN);
if (rcode) {
return rcode;
} else {
if (pgid1->pgno > pgid2->pgno) {
return 1;
} else if (pgid1->pgno < pgid2->pgno) {
return -1;
} else {
return 0;
}
}
}
#define TDB_IS_SAME_PAGE(pPgid1, pPgid2) (tdbCmprPgId(pPgid1, pPgid2) == 0)
// pgsz_t
#define TDB_MIN_PGSIZE 512 // 512B
#define TDB_MAX_PGSIZE 16777216 // 16M
#define TDB_DEFAULT_PGSIZE 4096
#define TDB_IS_PGSIZE_VLD(s) (((s) >= TDB_MIN_PGSIZE) && ((s) <= TDB_MAX_PGSIZE))
// cache
#define TDB_DEFAULT_CACHE_SIZE (256 * 4096) // 1M
// dbname
#define TDB_MAX_DBNAME_LEN 24
......@@ -98,8 +71,6 @@ static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) {
#define TDB_FILENAME_LEN 128
#define TDB_DEFAULT_FANOUT 6
#define BTREE_MAX_DEPTH 20
#define TDB_FLAG_IS(flags, flag) ((flags) == (flag))
......@@ -152,27 +123,13 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
// SBTC
int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn);
int tdbBtcMoveTo2(SBTC *pBtc, const void *pKey, int kLen, tdb_cmpr_fn_t cmprFn, int flags);
int tdbBtcMoveToFirst(SBTC *pBtc);
int tdbBtcMoveToLast(SBTC *pBtc);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbBtcClose(SBTC *pBtc);
// tdbPager.c ====================================
struct SPager {
char *dbFileName;
char *jFileName;
int pageSize;
uint8_t fid[TDB_FILE_ID_LEN];
tdb_fd_t fd;
tdb_fd_t jfd;
SPCache *pCache;
SPgno dbFileSize;
SPgno dbOrigSize;
SPage *pDirty;
u8 inTran;
SPager *pNext; // used by TENV
SPager *pHashNext; // used by TENV
};
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager);
......@@ -346,6 +303,22 @@ struct STEnv {
SPager **pgrHash;
};
struct SPager {
char *dbFileName;
char *jFileName;
int pageSize;
uint8_t fid[TDB_FILE_ID_LEN];
tdb_fd_t fd;
tdb_fd_t jfd;
SPCache *pCache;
SPgno dbFileSize;
SPgno dbOrigSize;
SPage *pDirty;
u8 inTran;
SPager *pNext; // used by TENV
SPager *pHashNext; // used by TENV
};
#ifdef __cplusplus
}
#endif
......
......@@ -199,11 +199,11 @@ TEST(tdb_test, simple_test) {
int vLen, kLen;
int count = 0;
ret = tdbDbcOpen(pDb, &pDBC);
ret = tdbDbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
for (;;) {
ret = tdbDbNext(pDBC, &pKey, &kLen, &pVal, &vLen);
ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
......@@ -280,11 +280,11 @@ TEST(tdb_test, simple_test2) {
int vLen, kLen;
int count = 0;
ret = tdbDbcOpen(pDb, &pDBC);
ret = tdbDbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
for (;;) {
ret = tdbDbNext(pDBC, &pKey, &kLen, &pVal, &vLen);
ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册