提交 48207a16 编写于 作者: H Hongze Cheng

refact vnode and add tdb api

上级 496ec4c1
...@@ -47,66 +47,66 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -47,66 +47,66 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
// open env // open env
ret = tdbEnvOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv); ret = tdbEnvOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pTbDb // open pTbDb
ret = tdbDbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb); ret = tdbDbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pSkmDb // open pSkmDb
ret = tdbDbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb); ret = tdbDbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pUidIdx // open pUidIdx
ret = tdbDbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); ret = tdbDbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pNameIdx // open pNameIdx
ret = tdbDbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx); ret = tdbDbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pCtbIdx // open pCtbIdx
ret = tdbDbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx); ret = tdbDbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pTagIdx // open pTagIdx
ret = tdbDbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); ret = tdbDbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open pTtlIdx // open pTtlIdx
ret = tdbDbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); ret = tdbDbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
if (ret < 0) { if (ret < 0) {
metaError("vgId: %d failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open index // open index
if (metaOpenIdx(pMeta) < 0) { if (metaOpenIdx(pMeta) < 0) {
metaError("vgId: %d failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
metaDebug("vgId: %d meta is opened", TD_VID(pVnode)); metaDebug("vgId:%d meta is opened", TD_VID(pVnode));
*ppMeta = pMeta; *ppMeta = pMeta;
return 0; return 0;
......
...@@ -45,7 +45,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) { ...@@ -45,7 +45,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
goto _err; goto _err;
} }
tsdbDebug("vgId: %d tsdb is opened", TD_VID(pVnode)); tsdbDebug("vgId:%d tsdb is opened", TD_VID(pVnode));
*ppTsdb = pTsdb; *ppTsdb = pTsdb;
return 0; return 0;
......
...@@ -42,13 +42,13 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -42,13 +42,13 @@ int vnodeBegin(SVnode *pVnode) {
// begin meta // begin meta
if (metaBegin(pVnode->pMeta) < 0) { if (metaBegin(pVnode->pMeta) < 0) {
vError("vgId: %d failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
// begin tsdb // begin tsdb
if (tsdbBegin(pVnode->pTsdb) < 0) { if (tsdbBegin(pVnode->pTsdb) < 0) {
vError("vgId: %d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
...@@ -93,7 +93,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -93,7 +93,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// free info binary // free info binary
taosMemoryFree(data); taosMemoryFree(data);
vInfo("vgId: %d vnode info is saved, fname: %s", pInfo->config.vgId, fname); vInfo("vgId:%d vnode info is saved, fname: %s", pInfo->config.vgId, fname);
return 0; return 0;
...@@ -115,7 +115,7 @@ int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -115,7 +115,7 @@ int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
return -1; return -1;
} }
vInfo("vgId: %d vnode info is committed", pInfo->config.vgId); vInfo("vgId:%d vnode info is committed", pInfo->config.vgId);
return 0; return 0;
} }
......
...@@ -23,13 +23,13 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -23,13 +23,13 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
// check config // check config
if (vnodeCheckCfg(pCfg) < 0) { if (vnodeCheckCfg(pCfg) < 0) {
vError("vgId: %d failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno)); vError("vgId:%d failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno));
return -1; return -1;
} }
// create vnode env // create vnode env
if (tfsMkdir(pTfs, path) < 0) { if (tfsMkdir(pTfs, path) < 0) {
vError("vgId: %d failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno)); vError("vgId:%d failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno));
return -1; return -1;
} }
...@@ -39,11 +39,11 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -39,11 +39,11 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.state.applied = -1; info.state.applied = -1;
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
vError("vgId: %d failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno)); vError("vgId:%d failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno));
return -1; return -1;
} }
vInfo("vgId: %d vnode is created", pCfg->vgId); vInfo("vgId:%d vnode is created", pCfg->vgId);
return 0; return 0;
} }
...@@ -70,7 +70,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -70,7 +70,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1); pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
if (pVnode == NULL) { if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("vgId: %d failed to open vnode since %s", info.config.vgId, tstrerror(terrno)); vError("vgId:%d failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
return NULL; return NULL;
} }
...@@ -85,19 +85,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -85,19 +85,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open buffer pool // open buffer pool
if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) { if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) {
vError("vgId: %d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open meta // open meta
if (metaOpen(pVnode, &pVnode->pMeta) < 0) { if (metaOpen(pVnode, &pVnode->pMeta) < 0) {
vError("vgId: %d failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open tsdb // open tsdb
if (tsdbOpen(pVnode, &pVnode->pTsdb) < 0) { if (tsdbOpen(pVnode, &pVnode->pTsdb) < 0) {
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
...@@ -105,7 +105,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -105,7 +105,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) { if (pVnode->pWal == NULL) {
vError("vgId: %d failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
...@@ -113,25 +113,25 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -113,25 +113,25 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal); pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open query // open query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
vError("vgId: %d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// vnode begin // vnode begin
if (vnodeBegin(pVnode) < 0) { if (vnodeBegin(pVnode) < 0) {
vError("vgId: %d failed to begin since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
// open sync // open sync
if (vnodeSyncOpen(pVnode, dir)) { if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId: %d failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
......
...@@ -52,7 +52,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -52,7 +52,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
int len; int len;
int ret; int ret;
vTrace("vgId: %d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vTrace("vgId:%d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version); version);
pVnode->state.applied = version; pVnode->state.applied = version;
...@@ -62,7 +62,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -62,7 +62,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
len = pMsg->contLen - sizeof(SMsgHead); len = pMsg->contLen - sizeof(SMsgHead);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
...@@ -119,7 +119,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -119,7 +119,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
break; break;
} }
vDebug("vgId: %d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
// commit if need // commit if need
if (vnodeShouldCommit(pVnode)) { if (vnodeShouldCommit(pVnode)) {
...@@ -134,7 +134,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -134,7 +134,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
return 0; return 0;
_err: _err:
vDebug("vgId: %d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vDebug("vgId:%d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(terrno), version); tstrerror(terrno), version);
return -1; return -1;
} }
......
...@@ -45,17 +45,18 @@ int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); ...@@ -45,17 +45,18 @@ int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
// TDBC // TDBC
#define TDB_FLG_CMP_LT 0x1 // less than
#define TDB_FLG_CMP_EQ 0x2 // equal
#define TDB_FLG_CMP_GT 0x4 // greater than
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn); int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn);
int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen); int tdbDbcClose(TDBC *pDbc);
int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen, int *c);
int tdbDbcMoveToFirst(TDBC *pDbc);
int tdbDbcMoveToLast(TDBC *pDbc);
int tdbDbcMoveToNext(TDBC *pDbc);
int tdbDbcMoveToPrev(TDBC *pDbc);
int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen); int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen);
int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen); int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen);
int tdbDbcDrop(TDBC *pDbc); int tdbDbcDrop(TDBC *pDbc);
int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbDbcClose(TDBC *pDbc);
// TXN // TXN
#define TDB_TXN_WRITE 0x1 #define TDB_TXN_WRITE 0x1
......
...@@ -1310,13 +1310,10 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { ...@@ -1310,13 +1310,10 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
int ret; int ret;
int nCells; int nCells;
int c; int c;
SBTree *pBt;
SCell *pCell; SCell *pCell;
SPager *pPager;
SCellDecoder cd = {0}; SCellDecoder cd = {0};
SBTree *pBt = pBtc->pBt;
pBt = pBtc->pBt; SPager *pPager = pBt->pPager;
pPager = pBt->pPager;
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move from a clear cursor // move from a clear cursor
......
...@@ -111,8 +111,15 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn) { ...@@ -111,8 +111,15 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn) {
return 0; return 0;
} }
int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen) { int tdbDbcMoveTo(TDBC *pDbc, const void *pKey, int kLen, int *c) { return tdbBtcMoveTo(&pDbc->btc, pKey, kLen, c); }
// return tdbBtcMoveTo(&pDbc->btc, pKey, kLen, flags);
int tdbDbcMoveToFirst(TDBC *pDbc) { return tdbBtcMoveToFirst(&pDbc->btc); }
int tdbDbcMoveToLast(TDBC *pDbc) { return tdbBtcMoveToLast(&pDbc->btc); }
int tdbDbcMoveToNext(TDBC *pDbc) { return 0; }
int tdbDbcMoveToPrev(TDBC *pDbc) {
// TODO
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册