未验证 提交 43d8df07 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11077 from taosdata/feature/shm

shm
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
typedef struct SRpcMsg SRpcMsg; typedef struct SRpcMsg SRpcMsg;
typedef struct SEpSet SEpSet; typedef struct SEpSet SEpSet;
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMgmtWrapper SMgmtWrapper;
typedef enum { typedef enum {
QUERY_QUEUE, QUERY_QUEUE,
FETCH_QUEUE, FETCH_QUEUE,
...@@ -38,10 +39,11 @@ typedef enum { ...@@ -38,10 +39,11 @@ typedef enum {
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef struct { typedef struct {
SMgmtWrapper* pWrapper; SMgmtWrapper* pWrapper;
...@@ -51,14 +53,16 @@ typedef struct { ...@@ -51,14 +53,16 @@ typedef struct {
SendMnodeReqFp sendMnodeReqFp; SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
} SMsgCb; } SMsgCb;
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -107,20 +107,20 @@ void rpcClose(void *); ...@@ -107,20 +107,20 @@ void rpcClose(void *);
void * rpcMallocCont(int contLen); void * rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void * rpcReallocCont(void *ptr, int contLen); void * rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock
// These functions will not be called in the child process
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
// just release client conn to rpc instance, no close sock
void rpcReleaseHandle(void *handle, int8_t type); //
void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -55,9 +55,9 @@ int32_t taosProcRun(SProcObj *pProc); ...@@ -55,9 +55,9 @@ int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc); void taosProcStop(SProcObj *pProc);
bool taosProcIsChild(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc);
int32_t taosProcChildId(SProcObj *pProc); int32_t taosProcChildId(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype); ProcFuncType ftype);
int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype); ProcFuncType ftype);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -24,7 +24,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { ...@@ -24,7 +24,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype);
} }
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
} }
...@@ -32,8 +32,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { ...@@ -32,8 +32,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
} }
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
}
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) {
(*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type);
} }
\ No newline at end of file
...@@ -137,8 +137,8 @@ void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc ...@@ -137,8 +137,8 @@ void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
......
...@@ -56,7 +56,7 @@ void dndCleanupServer(SDnode *pDnode); ...@@ -56,7 +56,7 @@ void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -309,7 +309,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { ...@@ -309,7 +309,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
return 0; return 0;
} }
static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pMgmt->clientRpc == NULL) { if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE; terrno = TSDB_CODE_DND_OFFLINE;
return -1; return -1;
...@@ -319,7 +319,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { ...@@ -319,7 +319,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
return 0; return 0;
} }
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) { int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
} else { } else {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { ...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
} }
} }
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
if (pDnodeWrapper != NULL) { if (pDnodeWrapper != NULL) {
...@@ -362,7 +362,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { ...@@ -362,7 +362,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
} }
} }
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
int32_t code = -1; int32_t code = -1;
do { do {
......
...@@ -29,7 +29,7 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper); ...@@ -29,7 +29,7 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd ...@@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
} }
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) { void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pReq) {
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
SEpSet epSet = {0}; SEpSet epSet = {0};
......
...@@ -207,8 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -207,8 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
} }
} }
static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen, static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
ProcFuncType funcType) { int32_t rawBodyLen, ProcFuncType funcType) {
const int32_t headLen = CEIL8(rawHeadLen); const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
...@@ -471,12 +471,12 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -471,12 +471,12 @@ void taosProcCleanup(SProcObj *pProc) {
} }
} }
int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType funcType) { ProcFuncType funcType) {
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType);
} }
int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType funcType) { ProcFuncType funcType) {
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册