提交 1363021f 编写于 作者: S Shengliang Guan

shm

上级 17215ffe
...@@ -49,20 +49,21 @@ typedef struct { ...@@ -49,20 +49,21 @@ typedef struct {
} SWrapperCfg; } SWrapperCfg;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t refCount; int32_t refCount;
int32_t vgVersion; int32_t vgVersion;
int8_t dropped; int8_t dropped;
int8_t accessState; int8_t accessState;
uint64_t dbUid; uint64_t dbUid;
char *db; char *db;
char *path; char *path;
SVnode *pImpl; SVnode *pImpl;
STaosQueue *pWriteQ; STaosQueue *pWriteQ;
STaosQueue *pSyncQ; STaosQueue *pSyncQ;
STaosQueue *pApplyQ; STaosQueue *pApplyQ;
STaosQueue *pQueryQ; STaosQueue *pQueryQ;
STaosQueue *pFetchQ; STaosQueue *pFetchQ;
SMgmtWrapper *pWrapper;
} SVnodeObj; } SVnodeObj;
typedef struct { typedef struct {
......
...@@ -56,6 +56,7 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -56,6 +56,7 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->refCount = 0; pVnode->refCount = 0;
pVnode->dropped = 0; pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pWrapper = pMgmt->pWrapper;
pVnode->pImpl = pImpl; pVnode->pImpl = pImpl;
pVnode->vgVersion = pCfg->vgVersion; pVnode->vgVersion = pCfg->vgVersion;
pVnode->dbUid = pCfg->dbUid; pVnode->dbUid = pCfg->dbUid;
...@@ -127,7 +128,7 @@ static void *vmOpenVnodeFunc(void *param) { ...@@ -127,7 +128,7 @@ static void *vmOpenVnodeFunc(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes); pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pMgmt = pMgmt, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnodeCfg cfg = {.pWrapper = pMgmt->pWrapper, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
......
...@@ -82,7 +82,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -82,7 +82,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1; return -1;
} }
vnodeCfg.pMgmt = pMgmt; vnodeCfg.pWrapper = pMgmt->pWrapper;
vnodeCfg.pTfs = pMgmt->pTfs; vnodeCfg.pTfs = pMgmt->pTfs;
vnodeCfg.dbId = wrapperCfg.dbUid; vnodeCfg.dbId = wrapperCfg.dbUid;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
......
...@@ -46,12 +46,12 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO ...@@ -46,12 +46,12 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->ahandle = pRpc->ahandle; pRsp->ahandle = pRpc->ahandle;
rpcSendResponse(pRsp); dndSendRsp(pVnode->pWrapper, pRsp);
free(pRsp); free(pRsp);
} else { } else {
if (code != 0) code = terrno; if (code != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
rpcSendResponse(&rpcRsp); dndSendRsp(pVnode->pWrapper, &rpcRsp);
} }
} }
...@@ -236,7 +236,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -236,7 +236,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
if (msgType & 1u) { if (msgType & 1u) {
if (code != 0) code = terrno; if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
rpcSendResponse(&rsp); dndSendRsp(pMgmt->pWrapper, &rsp);
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......
...@@ -40,7 +40,7 @@ typedef struct { ...@@ -40,7 +40,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
uint64_t dbId; uint64_t dbId;
void *pMgmt; void *pWrapper;
STfs *pTfs; STfs *pTfs;
uint64_t wsize; uint64_t wsize;
uint64_t ssize; uint64_t ssize;
......
...@@ -78,14 +78,14 @@ struct SVnode { ...@@ -78,14 +78,14 @@ struct SVnode {
SWal* pWal; SWal* pWal;
tsem_t canCommit; tsem_t canCommit;
SQHandle* pQuery; SQHandle* pQuery;
void* pMgmt; void* pWrapper;
STfs* pTfs; STfs* pTfs;
}; };
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
#define vFatal(...) \ #define vFatal(...) \
do { \ do { \
......
...@@ -27,7 +27,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -27,7 +27,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnodeCfg cfg = defaultVnodeOptions; SVnodeCfg cfg = defaultVnodeOptions;
if (pVnodeCfg != NULL) { if (pVnodeCfg != NULL) {
cfg.vgId = pVnodeCfg->vgId; cfg.vgId = pVnodeCfg->vgId;
cfg.pMgmt = pVnodeCfg->pMgmt; cfg.pWrapper = pVnodeCfg->pWrapper;
cfg.pTfs = pVnodeCfg->pTfs; cfg.pTfs = pVnodeCfg->pTfs;
cfg.dbId = pVnodeCfg->dbId; cfg.dbId = pVnodeCfg->dbId;
cfg.hashBegin = pVnodeCfg->hashBegin; cfg.hashBegin = pVnodeCfg->hashBegin;
...@@ -79,7 +79,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { ...@@ -79,7 +79,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
} }
pVnode->vgId = pVnodeCfg->vgId; pVnode->vgId = pVnodeCfg->vgId;
pVnode->pMgmt = pVnodeCfg->pMgmt; pVnode->pWrapper = pVnodeCfg->pWrapper;
pVnode->pTfs = pVnodeCfg->pTfs; pVnode->pTfs = pVnodeCfg->pTfs;
pVnode->path = strdup(path); pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
......
...@@ -89,16 +89,16 @@ int vnodeScheduleTask(SVnodeTask* pTask) { ...@@ -89,16 +89,16 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
return 0; return 0;
} }
int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) { if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) {
terrno = TSDB_CODE_VND_APP_ERROR; terrno = TSDB_CODE_VND_APP_ERROR;
return -1; return -1;
} }
return (*vnodeMgr.putToQueryQFp)(pVnode->pMgmt, pReq); return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq);
} }
void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
(*vnodeMgr.sendReqFp)(pVnode->pMgmt, epSet, pReq); (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
} }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
......
...@@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); ...@@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode,
(putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqFp)vnodeSendReqToDnode); (putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq);
} }
void vnodeQueryClose(SVnode *pVnode) { void vnodeQueryClose(SVnode *pVnode) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册