提交 ef684076 编写于 作者: H Hongze Cheng

refact: tdb api

上级 eb661288
......@@ -210,7 +210,7 @@ struct SMetaReader {
};
struct SMTbCursor {
TDBC *pDbc;
TBC *pDbc;
void *pKey;
void *pVal;
int kLen;
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB;
typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB;
// metaDebug ==================
// clang-format off
......@@ -63,16 +63,16 @@ struct SMeta {
char* path;
SVnode* pVnode;
TENV* pEnv;
TDB* pEnv;
TXN txn;
TDB* pTbDb;
TDB* pSkmDb;
TDB* pUidIdx;
TDB* pNameIdx;
TDB* pCtbIdx;
TDB* pTagIdx;
TDB* pTtlIdx;
TDB* pSmaIdx;
TTB* pTbDb;
TTB* pSkmDb;
TTB* pUidIdx;
TTB* pNameIdx;
TTB* pCtbIdx;
TTB* pTagIdx;
TTB* pTtlIdx;
TTB* pSmaIdx;
SMetaIdx* pIdx;
};
......@@ -118,7 +118,7 @@ typedef struct {
int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
#endif
#ifdef __cplusplus
......
......@@ -46,7 +46,7 @@ struct SSmaEnv {
TXN txn;
void *pPool; // SPoolMem
SDiskID did;
TENV *dbEnv; // TODO: If it's better to put it in smaIndex level?
TDB *dbEnv; // TODO: If it's better to put it in smaIndex level?
char *path; // relative path
SSmaStat *pStat;
};
......@@ -93,16 +93,16 @@ typedef struct SDBFile SDBFile;
struct SDBFile {
int32_t fid;
TDB *pDB;
TTB *pDB;
char *path;
};
int32_t tdSmaBeginCommit(SSmaEnv *pEnv);
int32_t tdSmaEndCommit(SSmaEnv *pEnv);
int32_t smaOpenDBEnv(TENV **ppEnv, const char *path);
int32_t smaCloseDBEnv(TENV *pEnv);
int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF);
int32_t smaOpenDBEnv(TDB **ppEnv, const char *path);
int32_t smaCloseDBEnv(TDB *pEnv);
int32_t smaOpenDBF(TDB *pEnv, SDBFile *pDBF);
int32_t smaCloseDBF(SDBFile *pDBF);
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn);
void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_t *valLen);
......
......@@ -50,63 +50,63 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
taosMkDir(pMeta->path);
// open env
ret = tdbEnvOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv);
ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv);
if (ret < 0) {
metaError("vgId:%d failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTbDb
ret = tdbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
if (ret < 0) {
metaError("vgId:%d failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pSkmDb
ret = tdbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
if (ret < 0) {
metaError("vgId:%d failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pUidIdx
ret = tdbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pNameIdx
ret = tdbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pCtbIdx
ret = tdbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTagIdx
ret = tdbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTtlIdx
ret = tdbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pSmaIdx
ret = tdbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
......@@ -125,15 +125,15 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err:
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);
return -1;
......@@ -142,15 +142,15 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);
}
......
......@@ -35,7 +35,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u
STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db
if (tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
goto _err;
}
......@@ -58,7 +58,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
int64_t version;
// query uid.idx
if (tdbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
......@@ -72,7 +72,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
tb_uid_t uid;
// query name.idx
if (tdbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
......@@ -100,9 +100,9 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
metaReaderInit(&pTbCur->mr, pMeta, 0);
tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
tdbDbcMoveToFirst(pTbCur->pDbc);
tdbTbcMoveToFirst(pTbCur->pDbc);
return pTbCur;
}
......@@ -113,7 +113,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
tdbFree(pTbCur->pVal);
metaReaderClear(&pTbCur->mr);
if (pTbCur->pDbc) {
tdbDbcClose(pTbCur->pDbc);
tdbTbcClose(pTbCur->pDbc);
}
taosMemoryFree(pTbCur);
}
......@@ -125,7 +125,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
STbCfg tbCfg;
for (;;) {
ret = tdbDbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
if (ret < 0) {
return -1;
}
......@@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
pKey = &skmDbKey;
kLen = sizeof(skmDbKey);
metaRLock(pMeta);
ret = tdbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
ret = tdbTbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
metaULock(pMeta);
if (ret < 0) {
return NULL;
......@@ -184,7 +184,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
struct SMCtbCursor {
SMeta *pMeta;
TDBC *pCur;
TBC *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
......@@ -207,7 +207,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
pCtbCur->suid = uid;
metaRLock(pMeta);
ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pCtbCur);
......@@ -217,9 +217,9 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
// move to the suid
ctbIdxKey.suid = uid;
ctbIdxKey.uid = INT64_MIN;
tdbDbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
if (c > 0) {
tdbDbcMoveToNext(pCtbCur->pCur);
tdbTbcMoveToNext(pCtbCur->pCur);
}
return pCtbCur;
......@@ -229,7 +229,7 @@ void metaCloseCtbCursor(SMCtbCursor *pCtbCur) {
if (pCtbCur) {
if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta);
if (pCtbCur->pCur) {
tdbDbcClose(pCtbCur->pCur);
tdbTbcClose(pCtbCur->pCur);
tdbFree(pCtbCur->pKey);
tdbFree(pCtbCur->pVal);
......@@ -243,7 +243,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
int ret;
SCtbIdxKey *pCtbIdxKey;
ret = tdbDbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
if (ret < 0) {
return 0;
}
......@@ -299,7 +299,7 @@ int metaGetTbNum(SMeta *pMeta) {
typedef struct {
SMeta *pMeta;
TDBC *pCur;
TBC *pCur;
tb_uid_t uid;
void *pKey;
void *pVal;
......@@ -323,7 +323,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
pSmaCur->uid = uid;
metaRLock(pMeta);
ret = tdbDbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pSmaCur);
......@@ -333,9 +333,9 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
// move to the suid
smaIdxKey.uid = uid;
smaIdxKey.smaUid = INT64_MIN;
tdbDbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
if (c > 0) {
tdbDbcMoveToNext(pSmaCur->pCur);
tdbTbcMoveToNext(pSmaCur->pCur);
}
return pSmaCur;
......@@ -345,7 +345,7 @@ void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
if (pSmaCur) {
if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
if (pSmaCur->pCur) {
tdbDbcClose(pSmaCur->pCur);
tdbTbcClose(pSmaCur->pCur);
tdbFree(pSmaCur->pKey);
tdbFree(pSmaCur->pVal);
......@@ -359,7 +359,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
int ret;
SSmaIdxKey *pSmaIdxKey;
ret = tdbDbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
if (ret < 0) {
return 0;
}
......
......@@ -117,7 +117,7 @@ static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err;
}
......@@ -130,17 +130,17 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
return tdbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
}
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
......
......@@ -71,9 +71,9 @@ _err:
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
TDBC *pNameIdxc = NULL;
TDBC *pUidIdxc = NULL;
TDBC *pCtbIdxc = NULL;
TBC *pNameIdxc = NULL;
TBC *pUidIdxc = NULL;
TBC *pCtbIdxc = NULL;
SCtbIdxKey *pCtbIdxKey;
const void *pKey = NULL;
int nKey;
......@@ -82,43 +82,43 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
int c, ret;
// prepare uid idx cursor
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c != 0) {
terrno = TSDB_CODE_VND_TB_NOT_EXIST;
tdbDbcClose(pUidIdxc);
tdbTbcClose(pUidIdxc);
goto _err;
}
// prepare name idx cursor
tdbDbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
if (ret < 0 || c != 0) {
ASSERT(0);
}
tdbDbcDelete(pUidIdxc);
tdbDbcDelete(pNameIdxc);
tdbDbcClose(pUidIdxc);
tdbDbcClose(pNameIdxc);
tdbTbcDelete(pUidIdxc);
tdbTbcDelete(pNameIdxc);
tdbTbcClose(pUidIdxc);
tdbTbcClose(pNameIdxc);
// loop to drop each child table
tdbDbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (ret < 0 || (c < 0 && tdbDbcMoveToNext(pCtbIdxc) < 0)) {
tdbDbcClose(pCtbIdxc);
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (ret < 0 || (c < 0 && tdbTbcMoveToNext(pCtbIdxc) < 0)) {
tdbTbcClose(pCtbIdxc);
goto _exit;
}
for (;;) {
tdbDbcGet(pCtbIdxc, &pKey, &nKey, NULL, NULL);
tdbTbcGet(pCtbIdxc, &pKey, &nKey, NULL, NULL);
pCtbIdxKey = (SCtbIdxKey *)pKey;
if (pCtbIdxKey->suid > pReq->suid) break;
// drop the child table (TODO)
if (tdbDbcMoveToNext(pCtbIdxc) < 0) break;
if (tdbTbcMoveToNext(pCtbIdxc) < 0) break;
}
_exit:
......@@ -134,8 +134,8 @@ _err:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
TDBC *pUidIdxc = NULL;
TDBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pTbDbc = NULL;
const void *pData;
int nData;
int64_t oversion;
......@@ -143,14 +143,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int32_t ret;
int32_t c;
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c) {
ASSERT(0);
return -1;
}
ret = tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
......@@ -158,11 +158,11 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
oversion = *(int64_t *)pData;
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(ret == 0 && c == 0);
ret = tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
ASSERT(ret == 0);
tDecoderInit(&dc, pData, nData);
......@@ -191,12 +191,12 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index
tdbDbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaULock(pMeta);
tDecoderClear(&dc);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
return 0;
}
......@@ -256,9 +256,9 @@ _err:
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
TDBC *pTbDbc = NULL;
TDBC *pUidIdxc = NULL;
TDBC *pNameIdxc = NULL;
TBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pNameIdxc = NULL;
const void *pData;
int nData;
tb_uid_t uid;
......@@ -271,15 +271,15 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
int c = 0, ret;
// search & delete the name idx
tdbDbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
if (ret < 0 || !tdbDbcIsValid(pNameIdxc) || c) {
tdbDbcClose(pNameIdxc);
tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
if (ret < 0 || !tdbTbcIsValid(pNameIdxc) || c) {
tdbTbcClose(pNameIdxc);
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
}
ret = tdbDbcGet(pNameIdxc, NULL, NULL, &pData, &nData);
ret = tdbTbcGet(pNameIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
......@@ -287,36 +287,36 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
uid = *(tb_uid_t *)pData;
tdbDbcDelete(pNameIdxc);
tdbDbcClose(pNameIdxc);
tdbTbcDelete(pNameIdxc);
tdbTbcClose(pNameIdxc);
// search & delete uid idx
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
ret = tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
}
tver = *(int64_t *)pData;
tdbDbcDelete(pUidIdxc);
tdbDbcClose(pUidIdxc);
tdbTbcDelete(pUidIdxc);
tdbTbcClose(pUidIdxc);
// search and get meta entry
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbDbcMoveTo(pTbDbc, &(STbDbKey){.uid = uid, .version = tver}, sizeof(STbDbKey), &c);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbTbcMoveTo(pTbDbc, &(STbDbKey){.uid = uid, .version = tver}, sizeof(STbDbKey), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
ret = tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
......@@ -345,21 +345,21 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
taosMemoryFree(pDataCopy);
tDecoderClear(&coder);
tdbDbcClose(pTbDbc);
tdbTbcClose(pTbDbc);
if (type == TSDB_CHILD_TABLE) {
// remove the pCtbIdx
TDBC *pCtbIdxc = NULL;
tdbDbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
TBC *pCtbIdxc = NULL;
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = uid}, sizeof(SCtbIdxKey), &c);
ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = uid}, sizeof(SCtbIdxKey), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
tdbDbcDelete(pCtbIdxc);
tdbDbcClose(pCtbIdxc);
tdbTbcDelete(pCtbIdxc);
tdbTbcClose(pCtbIdxc);
// remove tags from pTagIdx (todo)
} else if (type == TSDB_NORMAL_TABLE) {
......@@ -389,7 +389,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
int c;
// search name index
ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
......@@ -400,22 +400,22 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
pVal = NULL;
// search uid index
TDBC *pUidIdxc = NULL;
TBC *pUidIdxc = NULL;
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
// search table.db
TDBC *pTbDbc = NULL;
TBC *pTbDbc = NULL;
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
// get table entry
SDecoder dc = {0};
......@@ -505,21 +505,21 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
// save to table db
metaSaveToTbDb(pMeta, &entry);
tdbDbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaSaveToSkmDb(pMeta, &entry);
metaULock(pMeta);
tDecoderClear(&dc);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
return 0;
_err:
tDecoderClear(&dc);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
return -1;
}
......@@ -536,7 +536,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
int nData = 0;
// search name index
ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
......@@ -547,24 +547,24 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
pVal = NULL;
// search uid index
TDBC *pUidIdxc = NULL;
TBC *pUidIdxc = NULL;
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
// search table.db
TDBC *pTbDbc = NULL;
TBC *pTbDbc = NULL;
SDecoder dc = {0};
/* get ctbEntry */
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData);
ctbEntry.pBuf = taosMemoryMalloc(nData);
memcpy(ctbEntry.pBuf, pData, nData);
......@@ -573,9 +573,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear(&dc);
/* get stbEntry*/
tdbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbFree(pVal);
tDecoderInit(&dc, stbEntry.pBuf, nVal);
metaDecodeEntry(&dc, &stbEntry);
......@@ -632,19 +632,19 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaSaveToTbDb(pMeta, &ctbEntry);
// save to uid.idx
tdbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
return 0;
_err:
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc);
return -1;
}
......@@ -708,7 +708,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err;
}
......@@ -721,11 +721,11 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
......@@ -748,12 +748,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60;
ttlKey.uid = pME->uid;
return tdbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
}
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
return tdbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
}
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
......@@ -801,10 +801,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
SDecoder dc = {0};
// get super table
tdbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = *(int64_t *)pData;
tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &stbEntry);
......@@ -817,7 +817,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
&pTagIdxKey, &nTagIdxKey) < 0) {
return -1;
}
tdbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey);
tDecoderClear(&dc);
......@@ -859,7 +859,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderInit(&coder, pVal, vLen);
tEncodeSSchemaWrapper(&coder, pSW);
if (tdbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
rcode = -1;
goto _exit;
}
......
......@@ -17,12 +17,12 @@
#include "sma.h"
int32_t smaOpenDBEnv(TENV **ppEnv, const char *path) {
int32_t smaOpenDBEnv(TDB **ppEnv, const char *path) {
int ret = 0;
if (path == NULL) return -1;
ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param
ret = tdbOpen(path, 4096, 256, ppEnv); // use as param
if (ret != 0) {
smaError("failed to create tsdb db env, ret = %d", ret);
......@@ -32,7 +32,7 @@ int32_t smaOpenDBEnv(TENV **ppEnv, const char *path) {
return 0;
}
int32_t smaCloseDBEnv(TENV *pEnv) { return tdbEnvClose(pEnv); }
int32_t smaCloseDBEnv(TDB *pEnv) { return tdbClose(pEnv); }
static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) {
const SSmaKey *pKey1 = (const SSmaKey *)arg1;
......@@ -54,21 +54,21 @@ static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int
return 0;
}
static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
static int32_t smaOpenDBDb(TTB **ppDB, TDB *pEnv, const char *pFName) {
tdb_cmpr_fn_t compFunc;
// Create a database
compFunc = tdSmaKeyCmpr;
if (tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
if (tdbTbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
return -1;
}
return 0;
}
static int32_t smaCloseDBDb(TDB *pDB) { return tdbClose(pDB); }
static int32_t smaCloseDBDb(TTB *pDB) { return tdbTbClose(pDB); }
int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) {
int32_t smaOpenDBF(TDB *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
if (!pEnv || !pDBF) {
terrno = TSDB_CODE_INVALID_PTR;
......@@ -99,7 +99,7 @@ int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, in
int32_t ret;
printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn);
ret = tdbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
ret = tdbTbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
smaError("failed to upsert tsma data into db, ret = %d", ret);
return -1;
......@@ -112,7 +112,7 @@ void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_
void *pVal = NULL;
int ret;
ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
ret = tdbTbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
smaError("failed to get tsma data from db, ret = %d", ret);
......
......@@ -55,7 +55,6 @@ typedef enum {
SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma
} ESmaStorageLevel;
// static func
static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
......@@ -69,18 +68,15 @@ static int32_t tdSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t f
static int32_t tdInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyLen, void *pData, int32_t dataLen,
TXN *txn);
TXN *txn);
// expired window
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version);
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
// read data
// implementation
/**
......@@ -157,7 +153,6 @@ static bool tdSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
return false;
}
/**
* @brief Approximate value for week/month/year.
*
......@@ -239,9 +234,8 @@ static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit,
return interval;
}
static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval,
int8_t intervalUnit) {
int8_t intervalUnit) {
pSmaH->pSma = pSma;
pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true);
pSmaH->pDataBlocks = pDataBlocks;
......@@ -493,11 +487,12 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
smaCloseDBF(&tSmaH.dFile);
}
tdSetTSmaDataFile(&tSmaH, indexUid, fid);
smaDebug("@@@ vgId:%d write to DBF %s, days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi32 " queryKey:%" PRIi64,
smaDebug("@@@ vgId:%d write to DBF %s, days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi32
" queryKey:%" PRIi64,
SMA_VID(pSma), tSmaH.dFile.path, minutePerFile, tSmaH.interval, storageLevel, testSkey);
if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) {
smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma),
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tdDestroyTSmaWriteH(&tSmaH);
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
......@@ -523,9 +518,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
tdResetExpiredWindow(pSma, pStat, indexUid, skey);
} else {
smaWarn("vgId:%d invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64,
SMA_VID(pSma), skey, tlen, indexUid);
SMA_VID(pSma), skey, tlen, indexUid);
}
}
}
tdSmaEndCommit(pEnv); // TODO: not commit for every insert
......@@ -557,7 +551,7 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
TXN *txn) {
SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert tsma data blocks into B+Tree(TDB)
// TODO: insert tsma data blocks into B+Tree(TTB)
if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
smaWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
......@@ -600,12 +594,12 @@ static int32_t tdResetExpiredWindow(SSma *pSma, SSmaStat *pStat, int64_t indexUi
if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) {
// error handling
tdUnRefSmaStat(pSma, pStat);
smaWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " fail", SMA_VID(pSma),
skey, indexUid);
smaWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " fail", SMA_VID(pSma), skey,
indexUid);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " succeed", SMA_VID(pSma),
skey, indexUid);
skey, indexUid);
// TODO: use a standalone interface to received state upate notification from stream computing module.
/**
* @brief state
......@@ -666,8 +660,7 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid) {
smaDebug("vgId:%d wait 1s to drop index %" PRIi64 " since refVal=%d", SMA_VID(pSma), indexUid, refVal);
taosSsleep(1);
if (++nSleep > SMA_DROP_EXPIRED_TIME) {
smaDebug("vgId:%d drop index %" PRIi64 " after wait %d (refVal=%d)", SMA_VID(pSma), indexUid, nSleep,
refVal);
smaDebug("vgId:%d drop index %" PRIi64 " after wait %d (refVal=%d)", SMA_VID(pSma), indexUid, nSleep, refVal);
break;
};
}
......@@ -730,17 +723,17 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
tdUnRefSmaStat(pSma, pStat);
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
smaWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, SMA_VID(pSma), indexUid,
tstrerror(terrno), smaStat);
tstrerror(terrno), smaStat);
return TSDB_CODE_FAILED;
}
if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY))) {
// TODO: mark this window as expired.
smaDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, SMA_VID(pSma),
querySKey, indexUid);
smaDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, SMA_VID(pSma), querySKey,
indexUid);
} else {
smaDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, SMA_VID(pSma), querySKey,
indexUid);
indexUid);
}
STSma *pTSma = pItem->pTSma;
......@@ -755,7 +748,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
tdInitTSmaFile(&tReadH, indexUid, querySKey);
smaDebug("### vgId:%d read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey);
SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey);
if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
smaWarn("vgId:%d open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
return TSDB_CODE_FAILED;
......@@ -766,18 +759,18 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
int64_t queryGroupId = 0;
tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma),
tReadH.dFile.path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN);
smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma), tReadH.dFile.path,
*(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), SMA_KEY_LEN);
void *result = NULL;
int32_t valueSize = 0;
if (!(result = smaGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize))) {
smaWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s",
SMA_VID(pSma), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno));
SMA_VID(pSma), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno));
smaCloseDBF(&tReadH.dFile);
return TSDB_CODE_FAILED;
}
#endif
#endif
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
for (uint32_t v = 0; v < valueSize; v += 8) {
......@@ -878,7 +871,7 @@ static SSmaStatItem *tdNewSmaStatItem(int8_t state) {
}
static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version) {
int64_t version) {
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (!pItem) {
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
......@@ -923,17 +916,15 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
taosMemoryFreeClear(pItem->pTSma);
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
smaWarn("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", SMA_VID(pSma), indexUid,
winSKey);
winSKey);
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", SMA_VID(pSma), indexUid,
winSKey);
winSKey);
return TSDB_CODE_SUCCESS;
}
/**
* @brief Update expired window according to msg from stream computing module.
*
......@@ -1035,7 +1026,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
}
} else {
smaDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window ignore as duplicated",
SMA_VID(pSma), pTSma->indexUid, winSKey);
SMA_VID(pSma), pTSma->indexUid, winSKey);
}
}
}
......@@ -1044,4 +1035,3 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
return TSDB_CODE_SUCCESS;
}
......@@ -53,7 +53,7 @@ struct SSmaEnv {
TXN txn;
SPoolMem *pPool;
SDiskID did;
TENV *dbEnv; // TODO: If it's better to put it in smaIndex level?
TDB *dbEnv; // TODO: If it's better to put it in smaIndex level?
char *path; // relative path
SSmaStat *pStat;
};
......@@ -876,7 +876,7 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t ke
TXN *txn) {
SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert tsma data blocks into B+Tree(TDB)
// TODO: insert tsma data blocks into B+Tree(TTB)
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
tsdbWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
......
......@@ -17,12 +17,12 @@
#include "tsdb.h"
int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path) {
int32_t tsdbOpenDBEnv(TDB **ppEnv, const char *path) {
int ret = 0;
if (path == NULL) return -1;
ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param
ret = tdbOpen(path, 4096, 256, ppEnv); // use as param
if (ret != 0) {
tsdbError("Failed to create tsdb db env, ret = %d", ret);
......@@ -32,7 +32,7 @@ int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path) {
return 0;
}
int32_t tsdbCloseDBEnv(TENV *pEnv) { return tdbEnvClose(pEnv); }
int32_t tsdbCloseDBEnv(TDB *pEnv) { return tdbClose(pEnv); }
static inline int tsdbSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int len2) {
const SSmaKey *pKey1 = (const SSmaKey *)arg1;
......@@ -54,20 +54,20 @@ static inline int tsdbSmaKeyCmpr(const void *arg1, int len1, const void *arg2, i
return 0;
}
static int32_t tsdbOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
static int32_t tsdbOpenDBDb(TTB **ppDB, TDB *pEnv, const char *pFName) {
int ret;
tdb_cmpr_fn_t compFunc;
// Create a database
compFunc = tsdbSmaKeyCmpr;
ret = tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
ret = tdbTbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
return 0;
}
static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbClose(pDB); }
static int32_t tsdbCloseDBDb(TTB *pDB) { return tdbTbClose(pDB); }
int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF) {
int32_t tsdbOpenDBF(TDB *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
if (!pEnv || !pDBF) {
terrno = TSDB_CODE_INVALID_PTR;
......@@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) {
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret;
ret = tdbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
ret = tdbTbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
tsdbError("Failed to create insert sma data into db, ret = %d", ret);
return -1;
......@@ -110,7 +110,7 @@ void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32
void *pVal = NULL;
int ret;
ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
ret = tdbTbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
tsdbError("Failed to get sma data from db, ret = %d", ret);
......
......@@ -7,7 +7,7 @@ target_sources(tdb
"src/db/tdbUtil.c"
"src/db/tdbBtree.c"
"src/db/tdbDb.c"
"src/db/tdbEnv.c"
"src/db/tdbTable.c"
"src/db/tdbTxn.c"
"src/db/tdbPage.c"
"src/db/tdbOs.c"
......
......@@ -25,40 +25,40 @@ extern "C" {
typedef int (*tdb_cmpr_fn_t)(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
// exposed types
typedef struct STEnv TENV;
typedef struct STDB TDB;
typedef struct STDBC TDBC;
typedef struct STxn TXN;
// TENV
int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv);
int tdbEnvClose(TENV *pEnv);
int tdbBegin(TENV *pEnv, TXN *pTxn);
int tdbCommit(TENV *pEnv, TXN *pTxn);
typedef struct STDB TDB;
typedef struct STTB TTB;
typedef struct STBC TBC;
typedef struct STxn TXN;
// TDB
int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbOpen(const char *rootDir, int szPage, int pages, TDB **ppDb);
int tdbClose(TDB *pDb);
int tdbDrop(TDB *pDb);
int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn);
int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
int tdbBegin(TDB *pDb, TXN *pTxn);
int tdbCommit(TDB *pDb, TXN *pTxn);
// TTB
int tdbTbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb);
int tdbTbClose(TTB *pTb);
int tdbTbDrop(TTB *pTb);
int tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbTbDelete(TTB *pTb, const void *pKey, int kLen, TXN *pTxn);
int tdbTbUpsert(TTB *pTb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbTbGet(TTB *pTb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbTbPGet(TTB *pTb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
// TDBC
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn);
int tdbDbcClose(TDBC *pDbc);
int tdbDbcIsValid(TDBC *pDbc);
int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen, int *c);
int tdbDbcMoveToFirst(TDBC *pDbc);
int tdbDbcMoveToLast(TDBC *pDbc);
int tdbDbcMoveToNext(TDBC *pDbc);
int tdbDbcMoveToPrev(TDBC *pDbc);
int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen);
int tdbDbcDelete(TDBC *pDbc);
int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbDbcUpsert(TDBC *pDbc, const void *pKey, int nKey, const void *pData, int nData, int insert);
// TBC
int tdbTbcOpen(TTB *pTb, TBC **ppTbc, TXN *pTxn);
int tdbTbcClose(TBC *pTbc);
int tdbTbcIsValid(TBC *pTbc);
int tdbTbcMoveTo(TBC *pTbc, const void *pKey, int kLen, int *c);
int tdbTbcMoveToFirst(TBC *pTbc);
int tdbTbcMoveToLast(TBC *pTbc);
int tdbTbcMoveToNext(TBC *pTbc);
int tdbTbcMoveToPrev(TBC *pTbc);
int tdbTbcGet(TBC *pTbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen);
int tdbTbcDelete(TBC *pTbc);
int tdbTbcNext(TBC *pTbc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, int nData, int insert);
// TXN
#define TDB_TXN_WRITE 0x1
......
......@@ -15,134 +15,164 @@
#include "tdbInt.h"
struct STDB {
TENV *pEnv;
SBTree *pBt;
};
struct STDBC {
SBTC btc;
};
int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb) {
TDB *pDb;
SPager *pPager;
int ret;
char fFullName[TDB_FILENAME_LEN];
SPage *pPage;
SPgno pgno;
int tdbOpen(const char *rootDir, int szPage, int pages, TDB **ppDb) {
TDB *pDb;
int dsize;
int zsize;
int tsize;
u8 *pPtr;
int ret;
*ppDb = NULL;
pDb = (TDB *)tdbOsCalloc(1, sizeof(*pDb));
if (pDb == NULL) {
dsize = strlen(rootDir);
zsize = sizeof(*pDb) + dsize * 2 + strlen(TDB_JOURNAL_NAME) + 3;
pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
if (pPtr == NULL) {
return -1;
}
// pDb->pEnv
pDb->pEnv = pEnv;
pPager = tdbEnvGetPager(pEnv, fname);
if (pPager == NULL) {
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->rootDir, fname);
ret = tdbPagerOpen(pEnv->pCache, fFullName, &pPager);
if (ret < 0) {
return -1;
}
tdbEnvAddPager(pEnv, pPager);
pDb = (TDB *)pPtr;
pPtr += sizeof(*pDb);
// pDb->rootDir
pDb->rootDir = pPtr;
memcpy(pDb->rootDir, rootDir, dsize);
pDb->rootDir[dsize] = '\0';
pPtr = pPtr + dsize + 1;
// pDb->jfname
pDb->jfname = pPtr;
memcpy(pDb->jfname, rootDir, dsize);
pDb->jfname[dsize] = '/';
memcpy(pDb->jfname + dsize + 1, TDB_JOURNAL_NAME, strlen(TDB_JOURNAL_NAME));
pDb->jfname[dsize + 1 + strlen(TDB_JOURNAL_NAME)] = '\0';
pDb->jfd = -1;
ret = tdbPCacheOpen(szPage, pages, &(pDb->pCache));
if (ret < 0) {
return -1;
}
ASSERT(pPager != NULL);
// pDb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, keyCmprFn, &(pDb->pBt));
if (ret < 0) {
pDb->nPgrHash = 8;
tsize = sizeof(SPager *) * pDb->nPgrHash;
pDb->pgrHash = tdbOsMalloc(tsize);
if (pDb->pgrHash == NULL) {
return -1;
}
memset(pDb->pgrHash, 0, tsize);
mkdir(rootDir, 0755);
*ppDb = pDb;
return 0;
}
int tdbClose(TDB *pDb) {
SPager *pPager;
if (pDb) {
tdbBtreeClose(pDb->pBt);
for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) {
pDb->pgrList = pPager->pNext;
tdbPagerClose(pPager);
}
tdbPCacheClose(pDb->pCache);
tdbOsFree(pDb->pgrHash);
tdbOsFree(pDb);
}
return 0;
}
int tdbDrop(TDB *pDb) {
// TODO
return 0;
}
int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn);
}
int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); }
int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
return tdbBtreeUpsert(pDb->pBt, pKey, kLen, pVal, vLen, pTxn);
}
int tdbBegin(TDB *pDb, TXN *pTxn) {
SPager *pPager;
int ret;
int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
return tdbBtreeGet(pDb->pBt, pKey, kLen, ppVal, vLen);
}
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager, pTxn);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
return tdbBtreePGet(pDb->pBt, pKey, kLen, ppKey, pkLen, ppVal, vLen);
return 0;
}
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn) {
int ret;
TDBC *pDbc = NULL;
int tdbCommit(TDB *pDb, TXN *pTxn) {
SPager *pPager;
int ret;
*ppDbc = NULL;
pDbc = (TDBC *)tdbOsMalloc(sizeof(*pDbc));
if (pDbc == NULL) {
return -1;
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerCommit(pPager, pTxn);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
tdbBtcOpen(&pDbc->btc, pDb->pBt, pTxn);
*ppDbc = pDbc;
return 0;
}
int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen, int *c) { return tdbBtcMoveTo(&pDbc->btc, pKey, kLen, c); }
SPager *tdbEnvGetPager(TDB *pDb, const char *fname) {
u32 hash;
SPager **ppPager;
int tdbDbcMoveToFirst(TDBC *pDbc) { return tdbBtcMoveToFirst(&pDbc->btc); }
hash = tdbCstringHash(fname);
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) {
}
int tdbDbcMoveToLast(TDBC *pDbc) { return tdbBtcMoveToLast(&pDbc->btc); }
return *ppPager;
}
int tdbDbcMoveToNext(TDBC *pDbc) { return tdbBtcMoveToNext(&pDbc->btc); }
void tdbEnvAddPager(TDB *pDb, SPager *pPager) {
u32 hash;
SPager **ppPager;
int tdbDbcMoveToPrev(TDBC *pDbc) { return tdbBtcMoveToPrev(&pDbc->btc); }
// rehash if neccessary
if (pDb->nPager + 1 > pDb->nPgrHash) {
// TODO
}
int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen) {
return tdbBtcGet(&pDbc->btc, ppKey, pkLen, ppVal, pvLen);
}
// add to list
pPager->pNext = pDb->pgrList;
pDb->pgrList = pPager;
int tdbDbcDelete(TDBC *pDbc) { return tdbBtcDelete(&pDbc->btc); }
// add to hash
hash = tdbCstringHash(pPager->dbFileName);
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
pPager->pHashNext = *ppPager;
*ppPager = pPager;
int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return tdbBtreeNext(&pDbc->btc, ppKey, kLen, ppVal, vLen);
// increase the counter
pDb->nPager++;
}
int tdbDbcUpsert(TDBC *pDbc, const void *pKey, int nKey, const void *pData, int nData, int insert) {
return tdbBtcUpsert(&pDbc->btc, pKey, nKey, pData, nData, insert);
}
void tdbEnvRemovePager(TDB *pDb, SPager *pPager) {
u32 hash;
SPager **ppPager;
int tdbDbcClose(TDBC *pDbc) {
if (pDbc) {
tdbBtcClose(&pDbc->btc);
tdbOsFree(pDbc);
// remove from the list
for (ppPager = &pDb->pgrList; *ppPager && (*ppPager != pPager); ppPager = &((*ppPager)->pNext)) {
}
ASSERT(*ppPager == pPager);
*ppPager = pPager->pNext;
return 0;
}
// remove from hash
hash = tdbCstringHash(pPager->dbFileName);
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
for (; *ppPager && *ppPager != pPager; ppPager = &((*ppPager)->pHashNext)) {
}
ASSERT(*ppPager == pPager);
*ppPager = pPager->pNext;
// decrease the counter
pDb->nPager--;
int tdbDbcIsValid(TDBC *pDbc) { return tdbBtcIsValid(&pDbc->btc); }
\ No newline at end of file
// rehash if necessary
if (pDb->nPgrHash > 8 && pDb->nPager < pDb->nPgrHash / 2) {
// TODO
}
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
int tdbEnvOpen(const char *rootDir, int szPage, int pages, TENV **ppEnv) {
TENV *pEnv;
int dsize;
int zsize;
int tsize;
u8 *pPtr;
int ret;
*ppEnv = NULL;
dsize = strlen(rootDir);
zsize = sizeof(*pEnv) + dsize * 2 + strlen(TDB_JOURNAL_NAME) + 3;
pPtr = (uint8_t *)tdbOsCalloc(1, zsize);
if (pPtr == NULL) {
return -1;
}
pEnv = (TENV *)pPtr;
pPtr += sizeof(*pEnv);
// pEnv->rootDir
pEnv->rootDir = pPtr;
memcpy(pEnv->rootDir, rootDir, dsize);
pEnv->rootDir[dsize] = '\0';
pPtr = pPtr + dsize + 1;
// pEnv->jfname
pEnv->jfname = pPtr;
memcpy(pEnv->jfname, rootDir, dsize);
pEnv->jfname[dsize] = '/';
memcpy(pEnv->jfname + dsize + 1, TDB_JOURNAL_NAME, strlen(TDB_JOURNAL_NAME));
pEnv->jfname[dsize + 1 + strlen(TDB_JOURNAL_NAME)] = '\0';
pEnv->jfd = -1;
ret = tdbPCacheOpen(szPage, pages, &(pEnv->pCache));
if (ret < 0) {
return -1;
}
pEnv->nPgrHash = 8;
tsize = sizeof(SPager *) * pEnv->nPgrHash;
pEnv->pgrHash = tdbOsMalloc(tsize);
if (pEnv->pgrHash == NULL) {
return -1;
}
memset(pEnv->pgrHash, 0, tsize);
mkdir(rootDir, 0755);
*ppEnv = pEnv;
return 0;
}
int tdbEnvClose(TENV *pEnv) {
SPager *pPager;
if (pEnv) {
for (pPager = pEnv->pgrList; pPager; pPager = pEnv->pgrList) {
pEnv->pgrList = pPager->pNext;
tdbPagerClose(pPager);
}
tdbPCacheClose(pEnv->pCache);
tdbOsFree(pEnv->pgrHash);
tdbOsFree(pEnv);
}
return 0;
}
int tdbBegin(TENV *pEnv, TXN *pTxn) {
SPager *pPager;
int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager, pTxn);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
return 0;
}
int tdbCommit(TENV *pEnv, TXN *pTxn) {
SPager *pPager;
int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerCommit(pPager, pTxn);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
return 0;
}
SPager *tdbEnvGetPager(TENV *pEnv, const char *fname) {
u32 hash;
SPager **ppPager;
hash = tdbCstringHash(fname);
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) {
}
return *ppPager;
}
void tdbEnvAddPager(TENV *pEnv, SPager *pPager) {
u32 hash;
SPager **ppPager;
// rehash if neccessary
if (pEnv->nPager + 1 > pEnv->nPgrHash) {
// TODO
}
// add to list
pPager->pNext = pEnv->pgrList;
pEnv->pgrList = pPager;
// add to hash
hash = tdbCstringHash(pPager->dbFileName);
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
pPager->pHashNext = *ppPager;
*ppPager = pPager;
// increase the counter
pEnv->nPager++;
}
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager) {
u32 hash;
SPager **ppPager;
// remove from the list
for (ppPager = &pEnv->pgrList; *ppPager && (*ppPager != pPager); ppPager = &((*ppPager)->pNext)) {
}
ASSERT(*ppPager == pPager);
*ppPager = pPager->pNext;
// remove from hash
hash = tdbCstringHash(pPager->dbFileName);
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
for (; *ppPager && *ppPager != pPager; ppPager = &((*ppPager)->pHashNext)) {
}
ASSERT(*ppPager == pPager);
*ppPager = pPager->pNext;
// decrease the counter
pEnv->nPager--;
// rehash if necessary
if (pEnv->nPgrHash > 8 && pEnv->nPager < pEnv->nPgrHash / 2) {
// TODO
}
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
struct STTB {
TDB *pEnv;
SBTree *pBt;
};
struct STBC {
SBTC btc;
};
int tdbTbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) {
TTB *pTb;
SPager *pPager;
int ret;
char fFullName[TDB_FILENAME_LEN];
SPage *pPage;
SPgno pgno;
*ppTb = NULL;
pTb = (TTB *)tdbOsCalloc(1, sizeof(*pTb));
if (pTb == NULL) {
return -1;
}
// pTb->pEnv
pTb->pEnv = pEnv;
pPager = tdbEnvGetPager(pEnv, fname);
if (pPager == NULL) {
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->rootDir, fname);
ret = tdbPagerOpen(pEnv->pCache, fFullName, &pPager);
if (ret < 0) {
return -1;
}
tdbEnvAddPager(pEnv, pPager);
}
ASSERT(pPager != NULL);
// pTb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, keyCmprFn, &(pTb->pBt));
if (ret < 0) {
return -1;
}
*ppTb = pTb;
return 0;
}
int tdbTbClose(TTB *pTb) {
if (pTb) {
tdbBtreeClose(pTb->pBt);
tdbOsFree(pTb);
}
return 0;
}
int tdbTbDrop(TTB *pTb) {
// TODO
return 0;
}
int tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pTb->pBt, pKey, keyLen, pVal, valLen, pTxn);
}
int tdbTbDelete(TTB *pTb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pTb->pBt, pKey, kLen, pTxn); }
int tdbTbUpsert(TTB *pTb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
return tdbBtreeUpsert(pTb->pBt, pKey, kLen, pVal, vLen, pTxn);
}
int tdbTbGet(TTB *pTb, const void *pKey, int kLen, void **ppVal, int *vLen) {
return tdbBtreeGet(pTb->pBt, pKey, kLen, ppVal, vLen);
}
int tdbTbPGet(TTB *pTb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
return tdbBtreePGet(pTb->pBt, pKey, kLen, ppKey, pkLen, ppVal, vLen);
}
int tdbTbcOpen(TTB *pTb, TBC **ppTbc, TXN *pTxn) {
int ret;
TBC *pTbc = NULL;
*ppTbc = NULL;
pTbc = (TBC *)tdbOsMalloc(sizeof(*pTbc));
if (pTbc == NULL) {
return -1;
}
tdbBtcOpen(&pTbc->btc, pTb->pBt, pTxn);
*ppTbc = pTbc;
return 0;
}
int tdbTbcMoveTo(TBC *pTbc, const void *pKey, int kLen, int *c) { return tdbBtcMoveTo(&pTbc->btc, pKey, kLen, c); }
int tdbTbcMoveToFirst(TBC *pTbc) { return tdbBtcMoveToFirst(&pTbc->btc); }
int tdbTbcMoveToLast(TBC *pTbc) { return tdbBtcMoveToLast(&pTbc->btc); }
int tdbTbcMoveToNext(TBC *pTbc) { return tdbBtcMoveToNext(&pTbc->btc); }
int tdbTbcMoveToPrev(TBC *pTbc) { return tdbBtcMoveToPrev(&pTbc->btc); }
int tdbTbcGet(TBC *pTbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen) {
return tdbBtcGet(&pTbc->btc, ppKey, pkLen, ppVal, pvLen);
}
int tdbTbcDelete(TBC *pTbc) { return tdbBtcDelete(&pTbc->btc); }
int tdbTbcNext(TBC *pTbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return tdbBtreeNext(&pTbc->btc, ppKey, kLen, ppVal, vLen);
}
int tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, int nData, int insert) {
return tdbBtcUpsert(&pTbc->btc, pKey, nKey, pData, nData, insert);
}
int tdbTbcClose(TBC *pTbc) {
if (pTbc) {
tdbBtcClose(&pTbc->btc);
tdbOsFree(pTbc);
}
return 0;
}
int tdbTbcIsValid(TBC *pTbc) { return tdbBtcIsValid(&pTbc->btc); }
\ No newline at end of file
......@@ -103,9 +103,9 @@ typedef struct SPage SPage;
#define TDB_TXN_IS_READ_UNCOMMITTED(PTXN) ((PTXN)->flags & TDB_TXN_READ_UNCOMMITTED)
// tdbEnv.c ====================================
void tdbEnvAddPager(TENV *pEnv, SPager *pPager);
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager);
SPager *tdbEnvGetPager(TENV *pEnv, const char *fname);
void tdbEnvAddPager(TDB *pEnv, SPager *pPager);
void tdbEnvRemovePager(TDB *pEnv, SPager *pPager);
SPager *tdbEnvGetPager(TDB *pEnv, const char *fname);
// tdbBtree.c ====================================
typedef struct SBTree SBTree;
......@@ -334,7 +334,7 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
return pCell;
}
struct STEnv {
struct STDB {
char *rootDir;
char *jfname;
int jfd;
......@@ -357,8 +357,8 @@ struct SPager {
SPgno dbOrigSize;
SPage *pDirty;
u8 inTran;
SPager *pNext; // used by TENV
SPager *pHashNext; // used by TENV
SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB
};
#ifdef __cplusplus
......
......@@ -117,8 +117,8 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
TEST(tdb_test, simple_insert1) {
int ret;
TENV *pEnv;
TDB *pDb;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1000000;
TXN txn;
......@@ -126,12 +126,12 @@ TEST(tdb_test, simple_insert1) {
taosRemoveDir("tdb");
// Open Env
ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
ret = tdbOpen("tdb", 4096, 64, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
......@@ -152,7 +152,7 @@ TEST(tdb_test, simple_insert1) {
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
......@@ -181,7 +181,7 @@ TEST(tdb_test, simple_insert1) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbGet(pDb, key, strlen(key), &pVal, &vLen);
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
......@@ -193,19 +193,19 @@ TEST(tdb_test, simple_insert1) {
}
{ // Iterate to query the DB data
TDBC *pDBC;
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
ret = tdbDbcOpen(pDb, &pDBC, NULL);
ret = tdbTbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
tdbDbcMoveToFirst(pDBC);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
......@@ -217,28 +217,28 @@ TEST(tdb_test, simple_insert1) {
GTEST_ASSERT_EQ(count, nData);
tdbDbcClose(pDBC);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
}
}
ret = tdbDrop(pDb);
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbClose(pDb);
tdbTbClose(pDb);
// Close Env
ret = tdbEnvClose(pEnv);
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
TEST(tdb_test, simple_insert2) {
int ret;
TENV *pEnv;
TDB *pDb;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1000000;
TXN txn;
......@@ -246,12 +246,12 @@ TEST(tdb_test, simple_insert2) {
taosRemoveDir("tdb");
// Open Env
ret = tdbEnvOpen("tdb", 1024, 10, &pEnv);
ret = tdbOpen("tdb", 1024, 10, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tDefaultKeyCmpr;
ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
......@@ -271,24 +271,24 @@ TEST(tdb_test, simple_insert2) {
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // Iterate to query the DB data
TDBC *pDBC;
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
ret = tdbDbcOpen(pDb, &pDBC, NULL);
ret = tdbTbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
tdbDbcMoveToFirst(pDBC);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
......@@ -300,7 +300,7 @@ TEST(tdb_test, simple_insert2) {
GTEST_ASSERT_EQ(count, nData);
tdbDbcClose(pDBC);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
......@@ -311,29 +311,29 @@ TEST(tdb_test, simple_insert2) {
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
ret = tdbDrop(pDb);
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbClose(pDb);
tdbTbClose(pDb);
// Close Env
ret = tdbEnvClose(pEnv);
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
TEST(tdb_test, simple_delete1) {
int ret;
TDB *pDb;
TTB *pDb;
char key[128];
char data[128];
TXN txn;
TENV *pEnv;
TDB *pEnv;
SPoolMem *pPool;
void *pKey = NULL;
void *pData = NULL;
int nKey;
TDBC *pDbc;
TBC *pDbc;
int nData;
int nKV = 69;
......@@ -342,11 +342,11 @@ TEST(tdb_test, simple_delete1) {
pPool = openPool();
// open env
ret = tdbEnvOpen("tdb", 1024, 256, &pEnv);
ret = tdbOpen("tdb", 1024, 256, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
......@@ -356,7 +356,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -365,7 +365,7 @@ TEST(tdb_test, simple_delete1) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(data, pData, nData), 0);
}
......@@ -374,7 +374,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = nKV - 1; iData > 30; iData--) {
sprintf(key, "key%d", iData);
ret = tdbDelete(pDb, key, strlen(key), &txn);
ret = tdbTbDelete(pDb, key, strlen(key), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -382,7 +382,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
if (iData <= 30) {
GTEST_ASSERT_EQ(ret, 0);
} else {
......@@ -391,15 +391,15 @@ TEST(tdb_test, simple_delete1) {
}
// loop to iterate the data
tdbDbcOpen(pDb, &pDbc, NULL);
tdbTbcOpen(pDb, &pDbc, NULL);
ret = tdbDbcMoveToFirst(pDbc);
ret = tdbTbcMoveToFirst(pDbc);
GTEST_ASSERT_EQ(ret, 0);
pKey = NULL;
pData = NULL;
for (;;) {
ret = tdbDbcNext(pDbc, &pKey, &nKey, &pData, &nData);
ret = tdbTbcNext(pDbc, &pKey, &nKey, &pData, &nData);
if (ret < 0) break;
std::cout.write((char *)pKey, nKey) /* << " " << kLen */ << " ";
......@@ -407,20 +407,20 @@ TEST(tdb_test, simple_delete1) {
std::cout << std::endl;
}
tdbDbcClose(pDbc);
tdbTbcClose(pDbc);
tdbCommit(pEnv, &txn);
closePool(pPool);
tdbClose(pDb);
tdbEnvClose(pEnv);
tdbTbClose(pDb);
tdbClose(pEnv);
}
TEST(tdb_test, simple_upsert1) {
int ret;
TENV *pEnv;
TDB *pDb;
TDB *pEnv;
TTB *pDb;
int nData = 100000;
char key[64];
char data[64];
......@@ -431,11 +431,11 @@ TEST(tdb_test, simple_upsert1) {
taosRemoveDir("tdb");
// open env
ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
ret = tdbOpen("tdb", 4096, 64, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
pPool = openPool();
......@@ -446,7 +446,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -454,7 +454,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
......@@ -463,7 +463,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbTbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
......@@ -473,11 +473,11 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbTbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
tdbClose(pDb);
tdbEnvClose(pEnv);
tdbTbClose(pDb);
tdbClose(pEnv);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册